You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2021/04/20 17:26:44 UTC
[cassandra] branch trunk updated: Harden internode message resource
limit accounting against serialization failures
This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6fa5300 Harden internode message resource limit accounting against serialization failures
6fa5300 is described below
commit 6fa5300682fbfcbaaae9d4593a015c18ab34df1f
Author: Jon Meredith <jm...@apple.com>
AuthorDate: Tue Apr 20 09:41:05 2021 -0700
Harden internode message resource limit accounting against serialization failures
patch by Jon Meredith; reviewed by Benjamin Lerer, David Capwell for CASSANDRA-16616
---
CHANGES.txt | 1 +
.../UnrecoverableIllegalStateException.java | 26 +++++++++++++++++
.../apache/cassandra/net/OutboundConnection.java | 4 +--
.../org/apache/cassandra/net/ResourceLimits.java | 14 ++++++++-
.../cassandra/utils/JVMStabilityInspector.java | 5 ++++
.../apache/cassandra/net/ResourceLimitsTest.java | 34 +++++++++++++++++++++-
6 files changed, 80 insertions(+), 4 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 6dd8bab..108a762 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-rc1
+ * Harden internode message resource limit accounting against serialization failures (CASSANDRA-16616)
* Add back the source release of python driver in tree to avoid fetching from GitHub APIs (CASSANDRA-16599)
* Fix false unavailable for queries due to cluster topology changes (CASSANDRA-16545)
* Fixed a race condition issue in nodetool repair where we poll for the error before seeing the error notification, leading to a less meaningful message (CASSANDRA-16585)
diff --git a/src/java/org/apache/cassandra/exceptions/UnrecoverableIllegalStateException.java b/src/java/org/apache/cassandra/exceptions/UnrecoverableIllegalStateException.java
new file mode 100644
index 0000000..2193630
--- /dev/null
+++ b/src/java/org/apache/cassandra/exceptions/UnrecoverableIllegalStateException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.exceptions;
+
+public class UnrecoverableIllegalStateException extends RuntimeException
+{
+ public UnrecoverableIllegalStateException(String message) {
+ super(message);
+ }
+}
diff --git a/src/java/org/apache/cassandra/net/OutboundConnection.java b/src/java/org/apache/cassandra/net/OutboundConnection.java
index 98c034c..82eb6ce 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnection.java
@@ -470,10 +470,10 @@ public class OutboundConnection
*/
private boolean onExpired(Message<?> message)
{
+ noSpamLogger.warn("{} dropping message of type {} whose timeout expired before reaching the network", id(), message.verb());
releaseCapacity(1, canonicalSize(message));
expiredCount += 1;
expiredBytes += canonicalSize(message);
- noSpamLogger.warn("{} dropping message of type {} whose timeout expired before reaching the network", id(), message.verb());
callbacks.onExpired(message, template.to);
return true;
}
@@ -485,11 +485,11 @@ public class OutboundConnection
*/
private void onFailedSerialize(Message<?> message, int messagingVersion, int bytesWrittenToNetwork, Throwable t)
{
+ logger.warn("{} dropping message of type {} due to error", id(), message.verb(), t);
JVMStabilityInspector.inspectThrowable(t);
releaseCapacity(1, canonicalSize(message));
errorCount += 1;
errorBytes += message.serializedSize(messagingVersion);
- logger.warn("{} dropping message of type {} due to error", id(), message.verb(), t);
callbacks.onFailedSerialize(message, template.to, messagingVersion, bytesWrittenToNetwork, t);
}
diff --git a/src/java/org/apache/cassandra/net/ResourceLimits.java b/src/java/org/apache/cassandra/net/ResourceLimits.java
index 7658d5f..8899040 100644
--- a/src/java/org/apache/cassandra/net/ResourceLimits.java
+++ b/src/java/org/apache/cassandra/net/ResourceLimits.java
@@ -17,6 +17,8 @@
*/
package org.apache.cassandra.net;
+import org.apache.cassandra.exceptions.UnrecoverableIllegalStateException;
+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
public abstract class ResourceLimits
@@ -155,7 +157,17 @@ public abstract class ResourceLimits
{
assert amount >= 0;
long using = usingUpdater.addAndGet(this, -amount);
- assert using >= 0;
+ if (using < 0L)
+ {
+ // Should never be able to release more than was allocated. While recovery is
+ // possible it would require synchronizing the closing of all outbound connections
+ // and reinitializing the Concurrent limit before reopening. For such an unlikely path
+ // (previously this was an assert), it is safer to terminate the JVM and have something external
+ // restart and get back to a known good state rather than intermittendly crashing on any of
+ // the connections sharing this limit.
+ throw new UnrecoverableIllegalStateException(
+ "Internode messaging byte limits that are shared between connections is invalid (using="+using+")");
+ }
return using >= limit ? Outcome.ABOVE_LIMIT : Outcome.BELOW_LIMIT;
}
}
diff --git a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
index 8cab602..4c0f972 100644
--- a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
+++ b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
@@ -28,6 +28,7 @@ import java.util.function.Consumer;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.exceptions.UnrecoverableIllegalStateException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -108,6 +109,10 @@ public final class JVMStabilityInspector
// the JVM behavior in case of OOM (CASSANDRA-13006).
throw (OutOfMemoryError) t;
}
+ else if (t instanceof UnrecoverableIllegalStateException)
+ {
+ isUnstable = true;
+ }
if (DatabaseDescriptor.getDiskFailurePolicy() == Config.DiskFailurePolicy.die)
if (t instanceof FSError || t instanceof CorruptSSTableException)
diff --git a/test/unit/org/apache/cassandra/net/ResourceLimitsTest.java b/test/unit/org/apache/cassandra/net/ResourceLimitsTest.java
index 734d69a..f2f8a01 100644
--- a/test/unit/org/apache/cassandra/net/ResourceLimitsTest.java
+++ b/test/unit/org/apache/cassandra/net/ResourceLimitsTest.java
@@ -23,6 +23,12 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.LongFunction;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.KillerForTests;
+import org.junit.Assert;
import org.junit.Test;
import org.apache.cassandra.net.ResourceLimits.*;
@@ -149,4 +155,30 @@ public class ResourceLimitsTest
assertEquals(0, limit.using());
assertEquals(numPermits, limit.remaining());
}
-}
\ No newline at end of file
+
+ @Test
+ public void negativeConcurrentUsingValueKillsJVMTest()
+ {
+ DatabaseDescriptor.daemonInitialization(); // Prevent NPE for DatabaseDescriptor.getDiskFailurePolicy
+ KillerForTests killerForTests = new KillerForTests();
+ JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
+ try
+ {
+ Concurrent concurrent = new Concurrent(1);
+ try
+ {
+ concurrent.release(2);
+ }
+ catch (Throwable tr)
+ {
+ JVMStabilityInspector.inspectThrowable(tr);
+ }
+ Assert.assertTrue(killerForTests.wasKilled());
+ Assert.assertFalse(killerForTests.wasKilledQuietly()); //only killed quietly on startup failure
+ }
+ finally
+ {
+ JVMStabilityInspector.replaceKiller(originalKiller);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org