You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jw...@apache.org on 2020/05/04 18:18:46 UTC
[cassandra] branch trunk updated: Improve logging when mutation
passed to commit log is too large
This is an automated email from the ASF dual-hosted git repository.
jwest pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new d3dadcd Improve logging when mutation passed to commit log is too large
d3dadcd is described below
commit d3dadcd6f3bbde471e972f8332eb62de0f2d4aae
Author: nvharikrishna <n....@gmail.com>
AuthorDate: Wed Jan 8 22:06:06 2020 +0530
Improve logging when mutation passed to commit log is too large
Patch by Venkata Harikrishna Nukala; reviewed by Jordan West for CASSANDRA-14781
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/batchlog/Batch.java | 4 +-
.../org/apache/cassandra/db/CounterMutation.java | 39 +++++++++-
src/java/org/apache/cassandra/db/IMutation.java | 13 ++++
src/java/org/apache/cassandra/db/Mutation.java | 36 +++++++++
.../db/MutationExceededMaxSizeException.java | 89 ++++++++++++++++++++++
.../apache/cassandra/db/commitlog/CommitLog.java | 14 +---
.../cassandra/db/virtual/VirtualMutation.java | 5 ++
src/java/org/apache/cassandra/hints/Hint.java | 2 +-
.../apache/cassandra/schema/MigrationManager.java | 2 +-
.../service/reads/repair/BlockingReadRepairs.java | 61 +++++++--------
.../cassandra/test/microbench/MutationBench.java | 2 +-
.../db/MutationExceededMaxSizeExceptionTest.java | 46 +++++++++++
.../cassandra/db/commitlog/CommitLogTest.java | 55 +++++++++++--
14 files changed, 316 insertions(+), 53 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 89c8d7d..8c4cf35 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-alpha5
+ * Improve logging when mutation passed to commit log is too large (CASSANDRA-14781)
* replace LZ4FastDecompressor with LZ4SafeDecompressor (CASSANDRA-15560)
* Fix buffer pool NPE with concurrent release due to in-progress tiny pool eviction (CASSANDRA-15726)
* Avoid race condition when completing stream sessions (CASSANDRA-15666)
diff --git a/src/java/org/apache/cassandra/batchlog/Batch.java b/src/java/org/apache/cassandra/batchlog/Batch.java
index e91e3ca..fb6c5d5 100644
--- a/src/java/org/apache/cassandra/batchlog/Batch.java
+++ b/src/java/org/apache/cassandra/batchlog/Batch.java
@@ -90,7 +90,7 @@ public final class Batch
size += sizeofUnsignedVInt(batch.decodedMutations.size());
for (Mutation mutation : batch.decodedMutations)
{
- int mutationSize = (int) Mutation.serializer.serializedSize(mutation, version);
+ int mutationSize = mutation.serializedSize(version);
size += sizeofUnsignedVInt(mutationSize);
size += mutationSize;
}
@@ -108,7 +108,7 @@ public final class Batch
out.writeUnsignedVInt(batch.decodedMutations.size());
for (Mutation mutation : batch.decodedMutations)
{
- out.writeUnsignedVInt(Mutation.serializer.serializedSize(mutation, version));
+ out.writeUnsignedVInt(mutation.serializedSize(version));
Mutation.serializer.serialize(mutation, out, version);
}
}
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index bb10a6a..722ad73 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -45,6 +45,9 @@ import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.btree.BTreeSet;
import static java.util.concurrent.TimeUnit.*;
+import static org.apache.cassandra.net.MessagingService.VERSION_30;
+import static org.apache.cassandra.net.MessagingService.VERSION_3014;
+import static org.apache.cassandra.net.MessagingService.VERSION_40;
public class CounterMutation implements IMutation
{
@@ -76,6 +79,15 @@ public class CounterMutation implements IMutation
return mutation.getPartitionUpdates();
}
+ public void validateSize(int version, int overhead)
+ {
+ long totalSize = serializedSize(version) + overhead;
+ if(totalSize > MAX_MUTATION_SIZE)
+ {
+ throw new MutationExceededMaxSizeException(this, version, totalSize);
+ }
+ }
+
public Mutation getMutation()
{
return mutation;
@@ -308,6 +320,31 @@ public class CounterMutation implements IMutation
return DatabaseDescriptor.getCounterWriteRpcTimeout(unit);
}
+ private int serializedSize30;
+ private int serializedSize3014;
+ private int serializedSize40;
+
+ public int serializedSize(int version)
+ {
+ switch (version)
+ {
+ case VERSION_30:
+ if (serializedSize30 == 0)
+ serializedSize30 = (int) serializer.serializedSize(this, VERSION_30);
+ return serializedSize30;
+ case VERSION_3014:
+ if (serializedSize3014 == 0)
+ serializedSize3014 = (int) serializer.serializedSize(this, VERSION_3014);
+ return serializedSize3014;
+ case VERSION_40:
+ if (serializedSize40 == 0)
+ serializedSize40 = (int) serializer.serializedSize(this, VERSION_40);
+ return serializedSize40;
+ default:
+ throw new IllegalStateException("Unknown serialization version: " + version);
+ }
+ }
+
@Override
public String toString()
{
@@ -336,7 +373,7 @@ public class CounterMutation implements IMutation
public long serializedSize(CounterMutation cm, int version)
{
- return Mutation.serializer.serializedSize(cm.mutation, version)
+ return cm.mutation.serializedSize(version)
+ TypeSizes.sizeof(cm.consistency.name());
}
}
diff --git a/src/java/org/apache/cassandra/db/IMutation.java b/src/java/org/apache/cassandra/db/IMutation.java
index 1710cfd..10472c1 100644
--- a/src/java/org/apache/cassandra/db/IMutation.java
+++ b/src/java/org/apache/cassandra/db/IMutation.java
@@ -20,11 +20,14 @@ package org.apache.cassandra.db;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.schema.TableId;
public interface IMutation
{
+ public long MAX_MUTATION_SIZE = DatabaseDescriptor.getMaxMutationSize();
+
public void apply();
public String getKeyspaceName();
public Collection<TableId> getTableIds();
@@ -40,6 +43,16 @@ public interface IMutation
}
/**
+ * Validates size of mutation does not exceed {@link DatabaseDescriptor#getMaxMutationSize()}.
+ *
+ * @param version the MessagingService version the mutation is being serialized for.
+ * see {@link org.apache.cassandra.net.MessagingService#current_version}
+ * @param overhead overhadd to add for mutation size to validate. Pass zero if not required but not a negative value.
+ * @throws {@link MutationExceededMaxSizeException} if {@link DatabaseDescriptor#getMaxMutationSize()} is exceeded
+ */
+ public void validateSize(int version, int overhead);
+
+ /**
* Computes the total data size of the specified mutations.
* @param mutations the mutations
* @return the total data size of the specified mutations
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index 3d27ef3..16d20db 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -38,6 +38,9 @@ import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.ByteBufferUtil;
+import static org.apache.cassandra.net.MessagingService.VERSION_30;
+import static org.apache.cassandra.net.MessagingService.VERSION_3014;
+import static org.apache.cassandra.net.MessagingService.VERSION_40;
import static org.apache.cassandra.utils.MonotonicClock.approxTime;
public class Mutation implements IMutation
@@ -119,6 +122,15 @@ public class Mutation implements IMutation
return modifications.values();
}
+ public void validateSize(int version, int overhead)
+ {
+ long totalSize = serializedSize(version) + overhead;
+ if(totalSize > MAX_MUTATION_SIZE)
+ {
+ throw new MutationExceededMaxSizeException(this, version, totalSize);
+ }
+ }
+
public PartitionUpdate getPartitionUpdate(TableMetadata table)
{
return table == null ? null : modifications.get(table.id);
@@ -256,6 +268,30 @@ public class Mutation implements IMutation
}
return buff.append("])").toString();
}
+ private int serializedSize30;
+ private int serializedSize3014;
+ private int serializedSize40;
+
+ public int serializedSize(int version)
+ {
+ switch (version)
+ {
+ case VERSION_30:
+ if (serializedSize30 == 0)
+ serializedSize30 = (int) serializer.serializedSize(this, VERSION_30);
+ return serializedSize30;
+ case VERSION_3014:
+ if (serializedSize3014 == 0)
+ serializedSize3014 = (int) serializer.serializedSize(this, VERSION_3014);
+ return serializedSize3014;
+ case VERSION_40:
+ if (serializedSize40 == 0)
+ serializedSize40 = (int) serializer.serializedSize(this, VERSION_40);
+ return serializedSize40;
+ default:
+ throw new IllegalStateException("Unknown serialization version: " + version);
+ }
+ }
/**
* Creates a new simple mutuation builder.
diff --git a/src/java/org/apache/cassandra/db/MutationExceededMaxSizeException.java b/src/java/org/apache/cassandra/db/MutationExceededMaxSizeException.java
new file mode 100644
index 0000000..084c21e
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/MutationExceededMaxSizeException.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+
+import static org.apache.cassandra.db.IMutation.MAX_MUTATION_SIZE;
+
+public class MutationExceededMaxSizeException extends RuntimeException
+{
+ public static final int PARTITION_MESSAGE_LIMIT = 1024;
+
+ public final long mutationSize;
+
+ MutationExceededMaxSizeException(IMutation mutation, int serializationVersion, long totalSize)
+ {
+ super(prepareMessage(mutation, serializationVersion, totalSize));
+ this.mutationSize = totalSize;
+ }
+
+ private static String prepareMessage(IMutation mutation, int version, long totalSize)
+ {
+ List<String> topPartitions = mutation.getPartitionUpdates().stream()
+ .sorted((upd1, upd2) ->
+ Long.compare(PartitionUpdate.serializer.serializedSize(upd2, version),
+ PartitionUpdate.serializer.serializedSize(upd1, version)))
+ .map(upd -> String.format("%s.%s",
+ upd.metadata().name,
+ upd.metadata().partitionKeyType.getString(upd.partitionKey().getKey())))
+ .collect(Collectors.toList());
+
+ String topKeys = makeTopKeysString(topPartitions, PARTITION_MESSAGE_LIMIT);
+ return String.format("Encountered an oversized mutation (%d/%d) for keyspace: %s. Top keys are: %s",
+ totalSize,
+ MAX_MUTATION_SIZE,
+ mutation.getKeyspaceName(),
+ topKeys);
+ }
+
+ @VisibleForTesting
+ static String makeTopKeysString(List<String> keys, int maxLength) {
+ Iterator<String> iterator = keys.listIterator();
+ StringBuilder stringBuilder = new StringBuilder();
+ while (iterator.hasNext())
+ {
+ String key = iterator.next();
+
+ if (stringBuilder.length() == 0)
+ {
+ stringBuilder.append(key); //ensures atleast one key is added
+ iterator.remove();
+ }
+ else if (stringBuilder.length() + key.length() + 2 <= maxLength) // 2 for ", "
+ {
+ stringBuilder.append(", ").append(key);
+ iterator.remove();
+ }
+ else
+ break;
+ }
+
+ if (keys.size() > 0)
+ stringBuilder.append(" and ").append(keys.size()).append(" more.");
+
+ return stringBuilder.toString();
+ }
+}
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index c9e79cd..e7f8743 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -64,10 +64,6 @@ public class CommitLog implements CommitLogMBean
public static final CommitLog instance = CommitLog.construct();
- // we only permit records HALF the size of a commit log, to ensure we don't spin allocating many mostly
- // empty segments when writing large records
- final long MAX_MUTATION_SIZE = DatabaseDescriptor.getMaxMutationSize();
-
final public AbstractCommitLogSegmentManager segmentManager;
public final CommitLogArchiver archiver;
@@ -265,19 +261,13 @@ public class CommitLog implements CommitLogMBean
{
assert mutation != null;
+ mutation.validateSize(MessagingService.current_version, ENTRY_OVERHEAD_SIZE);
+
try (DataOutputBuffer dob = DataOutputBuffer.scratchBuffer.get())
{
Mutation.serializer.serialize(mutation, dob, MessagingService.current_version);
int size = dob.getLength();
-
int totalSize = size + ENTRY_OVERHEAD_SIZE;
- if (totalSize > MAX_MUTATION_SIZE)
- {
- throw new IllegalArgumentException(String.format("Mutation of %s is too large for the maximum size of %s",
- FBUtilities.prettyPrintMemory(totalSize),
- FBUtilities.prettyPrintMemory(MAX_MUTATION_SIZE)));
- }
-
Allocation alloc = segmentManager.allocate(mutation, totalSize);
CRC32 checksum = new CRC32();
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java b/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java
index 6db0acd..09ac4a6 100644
--- a/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java
@@ -109,4 +109,9 @@ public final class VirtualMutation implements IMutation
{
// no-op
}
+
+ public void validateSize(int version, int overhead)
+ {
+ // no-op
+ }
}
diff --git a/src/java/org/apache/cassandra/hints/Hint.java b/src/java/org/apache/cassandra/hints/Hint.java
index 7e4618c..6c7c5d4 100644
--- a/src/java/org/apache/cassandra/hints/Hint.java
+++ b/src/java/org/apache/cassandra/hints/Hint.java
@@ -149,7 +149,7 @@ public final class Hint
{
long size = sizeof(hint.creationTime);
size += sizeofUnsignedVInt(hint.gcgs);
- size += Mutation.serializer.serializedSize(hint.mutation, version);
+ size += hint.mutation.serializedSize(version);
return size;
}
diff --git a/src/java/org/apache/cassandra/schema/MigrationManager.java b/src/java/org/apache/cassandra/schema/MigrationManager.java
index 257634c..4f91d94 100644
--- a/src/java/org/apache/cassandra/schema/MigrationManager.java
+++ b/src/java/org/apache/cassandra/schema/MigrationManager.java
@@ -487,7 +487,7 @@ public class MigrationManager
{
int size = TypeSizes.sizeof(schema.size());
for (Mutation mutation : schema)
- size += Mutation.serializer.serializedSize(mutation, version);
+ size += mutation.serializedSize(version);
return size;
}
}
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java
index 3a4978e..68d1b4c 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java
@@ -21,11 +21,11 @@ package org.apache.cassandra.service.reads.repair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.MutationExceededMaxSizeException;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.locator.InetAddressAndPort;
@@ -33,6 +33,8 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.tracing.Tracing;
+import static org.apache.cassandra.db.IMutation.MAX_MUTATION_SIZE;
+
public class BlockingReadRepairs
{
private static final Logger logger = LoggerFactory.getLogger(BlockingReadRepairs.class);
@@ -51,43 +53,42 @@ public class BlockingReadRepairs
DecoratedKey key = update.partitionKey();
Mutation mutation = new Mutation(update);
- Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
- TableMetadata metadata = update.metadata();
-
int messagingVersion = MessagingService.instance().versions.get(destination);
- int mutationSize = (int) Mutation.serializer.serializedSize(mutation, messagingVersion);
- int maxMutationSize = DatabaseDescriptor.getMaxMutationSize();
-
-
- if (mutationSize <= maxMutationSize)
+ try
{
+ mutation.validateSize(messagingVersion, 0);
return mutation;
}
- else if (DROP_OVERSIZED_READ_REPAIR_MUTATIONS)
- {
- logger.debug("Encountered an oversized ({}/{}) read repair mutation for table {}, key {}, node {}",
- mutationSize,
- maxMutationSize,
- metadata,
- metadata.partitionKeyType.getString(key.getKey()),
- destination);
- return null;
- }
- else
+ catch (MutationExceededMaxSizeException e)
{
- logger.warn("Encountered an oversized ({}/{}) read repair mutation for table {}, key {}, node {}",
- mutationSize,
- maxMutationSize,
- metadata,
- metadata.partitionKeyType.getString(key.getKey()),
- destination);
+ Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
+ TableMetadata metadata = update.metadata();
- if (!suppressException)
+ if (DROP_OVERSIZED_READ_REPAIR_MUTATIONS)
{
- int blockFor = consistency.blockFor(keyspace);
- Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
- throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true);
+ logger.debug("Encountered an oversized ({}/{}) read repair mutation for table {}, key {}, node {}",
+ e.mutationSize,
+ MAX_MUTATION_SIZE,
+ metadata,
+ metadata.partitionKeyType.getString(key.getKey()),
+ destination);
+ }
+ else
+ {
+ logger.warn("Encountered an oversized ({}/{}) read repair mutation for table {}, key {}, node {}",
+ e.mutationSize,
+ MAX_MUTATION_SIZE,
+ metadata,
+ metadata.partitionKeyType.getString(key.getKey()),
+ destination);
+
+ if (!suppressException)
+ {
+ int blockFor = consistency.blockFor(keyspace);
+ Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
+ throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true);
+ }
}
return null;
}
diff --git a/test/microbench/org/apache/cassandra/test/microbench/MutationBench.java b/test/microbench/org/apache/cassandra/test/microbench/MutationBench.java
index 074e183..41d6aab 100644
--- a/test/microbench/org/apache/cassandra/test/microbench/MutationBench.java
+++ b/test/microbench/org/apache/cassandra/test/microbench/MutationBench.java
@@ -100,7 +100,7 @@ public class MutationBench
Schema.instance.load(ksm.withSwapped(ksm.tables.with(metadata)));
mutation = (Mutation)UpdateBuilder.create(metadata, 1L).newRow(1L).add("commentid", 32L).makeMutation();
- buffer = ByteBuffer.allocate((int) Mutation.serializer.serializedSize(mutation, MessagingService.current_version));
+ buffer = ByteBuffer.allocate(mutation.serializedSize(MessagingService.current_version));
outputBuffer = new DataOutputBufferFixed(buffer);
inputBuffer = new DataInputBuffer(buffer, false);
diff --git a/test/unit/org/apache/cassandra/db/MutationExceededMaxSizeExceptionTest.java b/test/unit/org/apache/cassandra/db/MutationExceededMaxSizeExceptionTest.java
new file mode 100644
index 0000000..81d9735
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/MutationExceededMaxSizeExceptionTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+
+import static org.apache.cassandra.db.MutationExceededMaxSizeException.makeTopKeysString;
+import static org.junit.Assert.*;
+
+public class MutationExceededMaxSizeExceptionTest
+{
+
+ @Test
+ public void testMakePKString()
+ {
+ List<String> keys = Arrays.asList("aaa", "bbb", "ccc");
+
+ assertEquals(0, makeTopKeysString(new ArrayList<>(), 1024).length());
+ assertEquals("aaa and 2 more.", makeTopKeysString(new ArrayList<>(keys), 0));
+ assertEquals("aaa and 2 more.", makeTopKeysString(new ArrayList<>(keys), 5));
+ assertEquals("aaa, bbb, ccc", makeTopKeysString(new ArrayList<>(keys), 13));
+ assertEquals("aaa, bbb, ccc", makeTopKeysString(new ArrayList<>(keys), 1024));
+ assertEquals("aaa, bbb and 1 more.", makeTopKeysString(new ArrayList<>(keys), 8));
+ assertEquals("aaa, bbb and 1 more.", makeTopKeysString(new ArrayList<>(keys), 10));
+ }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 1e3f622..0e7f30d 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -69,9 +69,13 @@ import org.apache.cassandra.utils.KillerForTests;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.vint.VIntCoding;
+import org.junit.After;
+
+import static org.apache.cassandra.db.commitlog.CommitLogSegment.ENTRY_OVERHEAD_SIZE;
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.cassandra.db.marshal.IntegerType;
@@ -420,7 +424,7 @@ public abstract class CommitLogTest
max -= CommitLogSegment.ENTRY_OVERHEAD_SIZE; // log entry overhead
// Note that the size of the value if vint encoded. So we first compute the ovehead of the mutation without the value and it's size
- int mutationOverhead = (int)Mutation.serializer.serializedSize(rm, MessagingService.current_version) - (VIntCoding.computeVIntSize(allocSize) + allocSize);
+ int mutationOverhead = rm.serializedSize(MessagingService.current_version) - (VIntCoding.computeVIntSize(allocSize) + allocSize);
max -= mutationOverhead;
// Now, max is the max for both the value and it's size. But we want to know how much we can allocate, i.e. the size of the value.
@@ -447,7 +451,7 @@ public abstract class CommitLogTest
CommitLog.instance.add(rm);
}
- @Test(expected = IllegalArgumentException.class)
+ @Test(expected = MutationExceededMaxSizeException.class)
public void testExceedRecordLimit() throws Exception
{
Keyspace ks = Keyspace.open(KEYSPACE1);
@@ -459,6 +463,47 @@ public abstract class CommitLogTest
CommitLog.instance.add(rm);
throw new AssertionError("mutation larger than limit was accepted");
}
+ @Test
+ public void testExceedRecordLimitWithMultiplePartitions() throws Exception
+ {
+ CommitLog.instance.resetUnsafe(true);
+ List<Mutation> mutations = new ArrayList<>();
+ Keyspace ks = Keyspace.open(KEYSPACE1);
+ char[] keyChars = new char[MutationExceededMaxSizeException.PARTITION_MESSAGE_LIMIT];
+ Arrays.fill(keyChars, 'k');
+ String key = new String(keyChars);
+
+ // large mutation
+ mutations.add(new RowUpdateBuilder(ks.getColumnFamilyStore(STANDARD1).metadata(), 0, key)
+ .clustering("bytes")
+ .add("val", ByteBuffer.allocate(1 + getMaxRecordDataSize()))
+ .build());
+
+ // smaller mutation
+ mutations.add(new RowUpdateBuilder(ks.getColumnFamilyStore(STANDARD2).metadata(), 0, key)
+ .clustering("bytes")
+ .add("val", ByteBuffer.allocate(1 + getMaxRecordDataSize() - 1024))
+ .build());
+
+ Mutation mutation = Mutation.merge(mutations);
+ try
+ {
+ CommitLog.instance.add(Mutation.merge(mutations));
+ throw new AssertionError("mutation larger than limit was accepted");
+ }
+ catch (MutationExceededMaxSizeException exception)
+ {
+ String message = exception.getMessage();
+
+ long mutationSize = mutation.serializedSize(MessagingService.current_version) + ENTRY_OVERHEAD_SIZE;
+ final String expectedMessagePrefix = String.format("Encountered an oversized mutation (%d/%d) for keyspace: %s.",
+ mutationSize,
+ DatabaseDescriptor.getMaxMutationSize(),
+ KEYSPACE1);
+ assertTrue(message.startsWith(expectedMessagePrefix));
+ assertTrue(message.contains(String.format("%s.%s and 1 more.", STANDARD1, key)));
+ }
+ }
protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception
{
@@ -655,7 +700,7 @@ public abstract class CommitLogTest
{
DatabaseDescriptor.setAutoSnapshot(false);
Keyspace notDurableKs = Keyspace.open(KEYSPACE2);
- Assert.assertFalse(notDurableKs.getMetadata().params.durableWrites);
+ assertFalse(notDurableKs.getMetadata().params.durableWrites);
ColumnFamilyStore cfs = notDurableKs.getColumnFamilyStore("Standard1");
new RowUpdateBuilder(cfs.metadata(), 0, "key1")
@@ -699,7 +744,7 @@ public abstract class CommitLogTest
SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, CommitLogPosition.NONE, cfs.metadata());
List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
- Assert.assertFalse(activeSegments.isEmpty());
+ assertFalse(activeSegments.isEmpty());
File[] files = new File(CommitLog.instance.segmentManager.storageDirectory).listFiles((file, name) -> activeSegments.contains(name));
replayer.replayFiles(files);
@@ -736,7 +781,7 @@ public abstract class CommitLogTest
SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, commitLogPosition, cfs.metadata());
List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
- Assert.assertFalse(activeSegments.isEmpty());
+ assertFalse(activeSegments.isEmpty());
File[] files = new File(CommitLog.instance.segmentManager.storageDirectory).listFiles((file, name) -> activeSegments.contains(name));
replayer.replayFiles(files);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org