You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2021/04/20 17:26:44 UTC

[cassandra] branch trunk updated: Harden internode message resource limit accounting against serialization failures

This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6fa5300  Harden internode message resource limit accounting against serialization failures
6fa5300 is described below

commit 6fa5300682fbfcbaaae9d4593a015c18ab34df1f
Author: Jon Meredith <jm...@apple.com>
AuthorDate: Tue Apr 20 09:41:05 2021 -0700

    Harden internode message resource limit accounting against serialization failures
    
    patch by Jon Meredith; reviewed by Benjamin Lerer, David Capwell for CASSANDRA-16616
---
 CHANGES.txt                                        |  1 +
 .../UnrecoverableIllegalStateException.java        | 26 +++++++++++++++++
 .../apache/cassandra/net/OutboundConnection.java   |  4 +--
 .../org/apache/cassandra/net/ResourceLimits.java   | 14 ++++++++-
 .../cassandra/utils/JVMStabilityInspector.java     |  5 ++++
 .../apache/cassandra/net/ResourceLimitsTest.java   | 34 +++++++++++++++++++++-
 6 files changed, 80 insertions(+), 4 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 6dd8bab..108a762 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-rc1
+ * Harden internode message resource limit accounting against serialization failures (CASSANDRA-16616)
  * Add back the source release of python driver in tree to avoid fetching from GitHub APIs (CASSANDRA-16599)
  * Fix false unavailable for queries due to cluster topology changes (CASSANDRA-16545)
  * Fixed a race condition issue in nodetool repair where we poll for the error before seeing the error notification, leading to a less meaningful message (CASSANDRA-16585)
diff --git a/src/java/org/apache/cassandra/exceptions/UnrecoverableIllegalStateException.java b/src/java/org/apache/cassandra/exceptions/UnrecoverableIllegalStateException.java
new file mode 100644
index 0000000..2193630
--- /dev/null
+++ b/src/java/org/apache/cassandra/exceptions/UnrecoverableIllegalStateException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.exceptions;
+
+public class UnrecoverableIllegalStateException extends RuntimeException
+{
+    public UnrecoverableIllegalStateException(String message) {
+        super(message);
+    }
+}
diff --git a/src/java/org/apache/cassandra/net/OutboundConnection.java b/src/java/org/apache/cassandra/net/OutboundConnection.java
index 98c034c..82eb6ce 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnection.java
@@ -470,10 +470,10 @@ public class OutboundConnection
      */
     private boolean onExpired(Message<?> message)
     {
+        noSpamLogger.warn("{} dropping message of type {} whose timeout expired before reaching the network", id(), message.verb());
         releaseCapacity(1, canonicalSize(message));
         expiredCount += 1;
         expiredBytes += canonicalSize(message);
-        noSpamLogger.warn("{} dropping message of type {} whose timeout expired before reaching the network", id(), message.verb());
         callbacks.onExpired(message, template.to);
         return true;
     }
@@ -485,11 +485,11 @@ public class OutboundConnection
      */
     private void onFailedSerialize(Message<?> message, int messagingVersion, int bytesWrittenToNetwork, Throwable t)
     {
+        logger.warn("{} dropping message of type {} due to error", id(), message.verb(), t);
         JVMStabilityInspector.inspectThrowable(t);
         releaseCapacity(1, canonicalSize(message));
         errorCount += 1;
         errorBytes += message.serializedSize(messagingVersion);
-        logger.warn("{} dropping message of type {} due to error", id(), message.verb(), t);
         callbacks.onFailedSerialize(message, template.to, messagingVersion, bytesWrittenToNetwork, t);
     }
 
diff --git a/src/java/org/apache/cassandra/net/ResourceLimits.java b/src/java/org/apache/cassandra/net/ResourceLimits.java
index 7658d5f..8899040 100644
--- a/src/java/org/apache/cassandra/net/ResourceLimits.java
+++ b/src/java/org/apache/cassandra/net/ResourceLimits.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.net;
 
+import org.apache.cassandra.exceptions.UnrecoverableIllegalStateException;
+
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
 public abstract class ResourceLimits
@@ -155,7 +157,17 @@ public abstract class ResourceLimits
         {
             assert amount >= 0;
             long using = usingUpdater.addAndGet(this, -amount);
-            assert using >= 0;
+            if (using < 0L)
+            {
+                // Should never be able to release more than was allocated.  While recovery is
+                // possible it would require synchronizing the closing of all outbound connections
+                // and reinitializing the Concurrent limit before reopening.  For such an unlikely path
+                // (previously this was an assert), it is safer to terminate the JVM and have something external
+                // restart and get back to a known good state rather than intermittendly crashing on any of
+                // the connections sharing this limit.
+                throw new UnrecoverableIllegalStateException(
+                    "Internode messaging byte limits that are shared between connections is invalid (using="+using+")");
+            }
             return using >= limit ? Outcome.ABOVE_LIMIT : Outcome.BELOW_LIMIT;
         }
     }
diff --git a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
index 8cab602..4c0f972 100644
--- a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
+++ b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
@@ -28,6 +28,7 @@ import java.util.function.Consumer;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import org.apache.cassandra.exceptions.UnrecoverableIllegalStateException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -108,6 +109,10 @@ public final class JVMStabilityInspector
             // the JVM behavior in case of OOM (CASSANDRA-13006).
             throw (OutOfMemoryError) t;
         }
+        else if (t instanceof UnrecoverableIllegalStateException)
+        {
+            isUnstable = true;
+        }
 
         if (DatabaseDescriptor.getDiskFailurePolicy() == Config.DiskFailurePolicy.die)
             if (t instanceof FSError || t instanceof CorruptSSTableException)
diff --git a/test/unit/org/apache/cassandra/net/ResourceLimitsTest.java b/test/unit/org/apache/cassandra/net/ResourceLimitsTest.java
index 734d69a..f2f8a01 100644
--- a/test/unit/org/apache/cassandra/net/ResourceLimitsTest.java
+++ b/test/unit/org/apache/cassandra/net/ResourceLimitsTest.java
@@ -23,6 +23,12 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.function.LongFunction;
 
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.KillerForTests;
+import org.junit.Assert;
 import org.junit.Test;
 
 import org.apache.cassandra.net.ResourceLimits.*;
@@ -149,4 +155,30 @@ public class ResourceLimitsTest
         assertEquals(0,          limit.using());
         assertEquals(numPermits, limit.remaining());
     }
-}
\ No newline at end of file
+
+    @Test
+    public void negativeConcurrentUsingValueKillsJVMTest()
+    {
+        DatabaseDescriptor.daemonInitialization(); // Prevent NPE for DatabaseDescriptor.getDiskFailurePolicy
+        KillerForTests killerForTests = new KillerForTests();
+        JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
+        try
+        {
+            Concurrent concurrent = new Concurrent(1);
+            try
+            {
+                concurrent.release(2);
+            }
+            catch (Throwable tr)
+            {
+                JVMStabilityInspector.inspectThrowable(tr);
+            }
+            Assert.assertTrue(killerForTests.wasKilled());
+            Assert.assertFalse(killerForTests.wasKilledQuietly()); //only killed quietly on startup failure
+        }
+        finally
+        {
+            JVMStabilityInspector.replaceKiller(originalKiller);
+        }
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org