You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2020/03/31 10:20:16 UTC

[cassandra] branch trunk updated: Fix race-conditions in ConnectionTesti

This is an automated email from the ASF dual-hosted git repository.

blerer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e08053b  Fix race-conditions in ConnectionTesti
e08053b is described below

commit e08053b77cac4ec91fd398d7bad65bba1394f45f
Author: yifan-c <yc...@gmail.com>
AuthorDate: Fri Mar 13 11:30:43 2020 -0700

    Fix race-conditions in ConnectionTesti
    
    patch by Yifan Cai; reviewed by Benjamin Lerer for CASSANDRA-15630
---
 test/unit/org/apache/cassandra/Util.java           | 22 +++++---
 .../org/apache/cassandra/net/ConnectionUtils.java  | 65 ++++++++++------------
 2 files changed, 44 insertions(+), 43 deletions(-)

diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 3dcaff7..c989407 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -41,6 +42,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import afu.org.checkerframework.checker.oigj.qual.O;
 import org.apache.cassandra.db.compaction.ActiveCompactionsTracker;
 import org.apache.cassandra.db.compaction.CompactionTasks;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -579,18 +581,24 @@ public class Util
         }
     }
 
-    public static void spinAssertEquals(Object expected, Supplier<Object> s, int timeoutInSeconds)
+    public static void spinAssertEquals(Object expected, Supplier<Object> actualSupplier, int timeoutInSeconds)
     {
-        long start = System.currentTimeMillis();
-        Object lastValue = null;
-        while (System.currentTimeMillis() < start + (1000 * timeoutInSeconds))
+        spinAssertEquals(null, expected, actualSupplier, timeoutInSeconds, TimeUnit.SECONDS);
+    }
+
+    public static <T> void spinAssertEquals(String message, T expected, Supplier<? extends T> actualSupplier, long timeout, TimeUnit timeUnit)
+    {
+        long startNano = System.nanoTime();
+        long expireAtNano = startNano + timeUnit.toNanos(timeout);
+        T actual = null;
+        while (System.nanoTime() < expireAtNano)
         {
-            lastValue = s.get();
-            if (lastValue.equals(expected))
+            actual = actualSupplier.get();
+            if (actual.equals(expected))
                 break;
             Thread.yield();
         }
-        assertEquals(expected, lastValue);
+        assertEquals(message, expected, actual);
     }
 
     public static void joinThread(Thread thread) throws InterruptedException
diff --git a/test/unit/org/apache/cassandra/net/ConnectionUtils.java b/test/unit/org/apache/cassandra/net/ConnectionUtils.java
index e391785..5aff390 100644
--- a/test/unit/org/apache/cassandra/net/ConnectionUtils.java
+++ b/test/unit/org/apache/cassandra/net/ConnectionUtils.java
@@ -18,19 +18,17 @@
 
 package org.apache.cassandra.net;
 
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
-import com.google.common.util.concurrent.Uninterruptibles;
-import org.junit.Assert;
-
-import org.apache.cassandra.net.InboundMessageHandlers;
-import org.apache.cassandra.net.OutboundConnection;
+import static org.apache.cassandra.Util.spinAssertEquals;
 
 public class ConnectionUtils
 {
     public interface FailCheck
     {
-        public void accept(String message, long expected, long actual);
+        public void accept(String message, Long expected, Supplier<Long> actualSupplier);
     }
 
     public static class OutboundCountChecker
@@ -98,44 +96,44 @@ public class ConnectionUtils
 
         public void check()
         {
-            doCheck(Assert::assertEquals);
+            doCheck((message, expected, actual) -> spinAssertEquals(message, expected, actual, 5, TimeUnit.SECONDS));
         }
 
         public void check(FailCheck failCheck)
         {
-            doCheck((message, expect, actual) -> { if (expect != actual) failCheck.accept(message, expect, actual); });
+            doCheck((message, expect, actual) -> { if (!Objects.equals(expect, actual.get())) failCheck.accept(message, expect, actual); });
         }
 
         private void doCheck(FailCheck testAndFailCheck)
         {
             if (checkSubmitted)
             {
-                testAndFailCheck.accept("submitted count values don't match", submitted, connection.submittedCount());
+                testAndFailCheck.accept("submitted count values don't match", submitted, connection::submittedCount);
             }
             if (checkPending)
             {
-                testAndFailCheck.accept("pending count values don't match", pending, connection.pendingCount());
-                testAndFailCheck.accept("pending bytes values don't match", pendingBytes, connection.pendingBytes());
+                testAndFailCheck.accept("pending count values don't match", pending, () -> (long) connection.pendingCount());
+                testAndFailCheck.accept("pending bytes values don't match", pendingBytes, connection::pendingBytes);
             }
             if (checkSent)
             {
-                testAndFailCheck.accept("sent count values don't match", sent, connection.sentCount());
-                testAndFailCheck.accept("sent bytes values don't match", sentBytes, connection.sentBytes());
+                testAndFailCheck.accept("sent count values don't match", sent, connection::sentCount);
+                testAndFailCheck.accept("sent bytes values don't match", sentBytes, connection::sentBytes);
             }
             if (checkOverload)
             {
-                testAndFailCheck.accept("overload count values don't match", overload, connection.overloadedCount());
-                testAndFailCheck.accept("overload bytes values don't match", overloadBytes, connection.overloadedBytes());
+                testAndFailCheck.accept("overload count values don't match", overload, connection::overloadedCount);
+                testAndFailCheck.accept("overload bytes values don't match", overloadBytes, connection::overloadedBytes);
             }
             if (checkExpired)
             {
-                testAndFailCheck.accept("expired count values don't match", expired, connection.expiredCount());
-                testAndFailCheck.accept("expired bytes values don't match", expiredBytes, connection.expiredBytes());
+                testAndFailCheck.accept("expired count values don't match", expired, connection::expiredCount);
+                testAndFailCheck.accept("expired bytes values don't match", expiredBytes, connection::expiredBytes);
             }
             if (checkError)
             {
-                testAndFailCheck.accept("error count values don't match", error, connection.errorCount());
-                testAndFailCheck.accept("error bytes values don't match", errorBytes, connection.errorBytes());
+                testAndFailCheck.accept("error count values don't match", error, connection::errorCount);
+                testAndFailCheck.accept("error bytes values don't match", errorBytes, connection::errorBytes);
             }
         }
     }
@@ -197,45 +195,40 @@ public class ConnectionUtils
 
         public void check()
         {
-            doCheck(Assert::assertEquals);
+            doCheck((message, expected, actual) -> spinAssertEquals(message, expected, actual, 5, TimeUnit.SECONDS));
         }
 
         public void check(FailCheck failCheck)
         {
-            doCheck((message, expect, actual) -> { if (expect != actual) failCheck.accept(message, expect, actual); });
+            doCheck((message, expect, actual) -> { if (!Objects.equals(expect, actual.get())) failCheck.accept(message, expect, actual); });
         }
 
         private void doCheck(FailCheck testAndFailCheck)
         {
             if (checkReceived)
             {
-                testAndFailCheck.accept("received count values don't match", received, connection.receivedCount());
-                testAndFailCheck.accept("received bytes values don't match", receivedBytes, connection.receivedBytes());
+                testAndFailCheck.accept("received count values don't match", received, connection::receivedCount);
+                testAndFailCheck.accept("received bytes values don't match", receivedBytes, connection::receivedBytes);
             }
             if (checkProcessed)
             {
-                testAndFailCheck.accept("processed count values don't match", processed, connection.processedCount());
-                testAndFailCheck.accept("processed bytes values don't match", processedBytes, connection.processedBytes());
+                testAndFailCheck.accept("processed count values don't match", processed, connection::processedCount);
+                testAndFailCheck.accept("processed bytes values don't match", processedBytes, connection::processedBytes);
             }
             if (checkExpired)
             {
-                testAndFailCheck.accept("expired count values don't match", expired, connection.expiredCount());
-                testAndFailCheck.accept("expired bytes values don't match", expiredBytes, connection.expiredBytes());
+                testAndFailCheck.accept("expired count values don't match", expired, connection::expiredCount);
+                testAndFailCheck.accept("expired bytes values don't match", expiredBytes, connection::expiredBytes);
             }
             if (checkError)
             {
-                testAndFailCheck.accept("error count values don't match", error, connection.errorCount());
-                testAndFailCheck.accept("error bytes values don't match", errorBytes, connection.errorBytes());
+                testAndFailCheck.accept("error count values don't match", error, connection::errorCount);
+                testAndFailCheck.accept("error bytes values don't match", errorBytes, connection::errorBytes);
             }
             if (checkScheduled)
             {
-                // scheduled cannot relied upon to not race with completion of the task,
-                // so if it is currently above the value we expect, sleep for a bit
-                if (scheduled < connection.scheduledCount())
-                    for (int i = 0; i < 10 && scheduled < connection.scheduledCount() ; ++i)
-                        Uninterruptibles.sleepUninterruptibly(1L, TimeUnit.MILLISECONDS);
-                testAndFailCheck.accept("scheduled count values don't match", scheduled, connection.scheduledCount());
-                testAndFailCheck.accept("scheduled bytes values don't match", scheduledBytes, connection.scheduledBytes());
+                testAndFailCheck.accept("scheduled count values don't match", scheduled, connection::scheduledCount);
+                testAndFailCheck.accept("scheduled bytes values don't match", scheduledBytes, connection::scheduledBytes);
             }
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org