You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2020/03/31 10:20:16 UTC
[cassandra] branch trunk updated: Fix race-conditions in
ConnectionTesti
This is an automated email from the ASF dual-hosted git repository.
blerer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new e08053b Fix race-conditions in ConnectionTesti
e08053b is described below
commit e08053b77cac4ec91fd398d7bad65bba1394f45f
Author: yifan-c <yc...@gmail.com>
AuthorDate: Fri Mar 13 11:30:43 2020 -0700
Fix race-conditions in ConnectionTesti
patch by Yifan Cai; reviewed by Benjamin Lerer for CASSANDRA-15630
---
test/unit/org/apache/cassandra/Util.java | 22 +++++---
.../org/apache/cassandra/net/ConnectionUtils.java | 65 ++++++++++------------
2 files changed, 44 insertions(+), 43 deletions(-)
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 3dcaff7..c989407 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -41,6 +42,7 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import afu.org.checkerframework.checker.oigj.qual.O;
import org.apache.cassandra.db.compaction.ActiveCompactionsTracker;
import org.apache.cassandra.db.compaction.CompactionTasks;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -579,18 +581,24 @@ public class Util
}
}
- public static void spinAssertEquals(Object expected, Supplier<Object> s, int timeoutInSeconds)
+ public static void spinAssertEquals(Object expected, Supplier<Object> actualSupplier, int timeoutInSeconds)
{
- long start = System.currentTimeMillis();
- Object lastValue = null;
- while (System.currentTimeMillis() < start + (1000 * timeoutInSeconds))
+ spinAssertEquals(null, expected, actualSupplier, timeoutInSeconds, TimeUnit.SECONDS);
+ }
+
+ public static <T> void spinAssertEquals(String message, T expected, Supplier<? extends T> actualSupplier, long timeout, TimeUnit timeUnit)
+ {
+ long startNano = System.nanoTime();
+ long expireAtNano = startNano + timeUnit.toNanos(timeout);
+ T actual = null;
+ while (System.nanoTime() < expireAtNano)
{
- lastValue = s.get();
- if (lastValue.equals(expected))
+ actual = actualSupplier.get();
+ if (actual.equals(expected))
break;
Thread.yield();
}
- assertEquals(expected, lastValue);
+ assertEquals(message, expected, actual);
}
public static void joinThread(Thread thread) throws InterruptedException
diff --git a/test/unit/org/apache/cassandra/net/ConnectionUtils.java b/test/unit/org/apache/cassandra/net/ConnectionUtils.java
index e391785..5aff390 100644
--- a/test/unit/org/apache/cassandra/net/ConnectionUtils.java
+++ b/test/unit/org/apache/cassandra/net/ConnectionUtils.java
@@ -18,19 +18,17 @@
package org.apache.cassandra.net;
+import java.util.Objects;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
-import com.google.common.util.concurrent.Uninterruptibles;
-import org.junit.Assert;
-
-import org.apache.cassandra.net.InboundMessageHandlers;
-import org.apache.cassandra.net.OutboundConnection;
+import static org.apache.cassandra.Util.spinAssertEquals;
public class ConnectionUtils
{
public interface FailCheck
{
- public void accept(String message, long expected, long actual);
+ public void accept(String message, Long expected, Supplier<Long> actualSupplier);
}
public static class OutboundCountChecker
@@ -98,44 +96,44 @@ public class ConnectionUtils
public void check()
{
- doCheck(Assert::assertEquals);
+ doCheck((message, expected, actual) -> spinAssertEquals(message, expected, actual, 5, TimeUnit.SECONDS));
}
public void check(FailCheck failCheck)
{
- doCheck((message, expect, actual) -> { if (expect != actual) failCheck.accept(message, expect, actual); });
+ doCheck((message, expect, actual) -> { if (!Objects.equals(expect, actual.get())) failCheck.accept(message, expect, actual); });
}
private void doCheck(FailCheck testAndFailCheck)
{
if (checkSubmitted)
{
- testAndFailCheck.accept("submitted count values don't match", submitted, connection.submittedCount());
+ testAndFailCheck.accept("submitted count values don't match", submitted, connection::submittedCount);
}
if (checkPending)
{
- testAndFailCheck.accept("pending count values don't match", pending, connection.pendingCount());
- testAndFailCheck.accept("pending bytes values don't match", pendingBytes, connection.pendingBytes());
+ testAndFailCheck.accept("pending count values don't match", pending, () -> (long) connection.pendingCount());
+ testAndFailCheck.accept("pending bytes values don't match", pendingBytes, connection::pendingBytes);
}
if (checkSent)
{
- testAndFailCheck.accept("sent count values don't match", sent, connection.sentCount());
- testAndFailCheck.accept("sent bytes values don't match", sentBytes, connection.sentBytes());
+ testAndFailCheck.accept("sent count values don't match", sent, connection::sentCount);
+ testAndFailCheck.accept("sent bytes values don't match", sentBytes, connection::sentBytes);
}
if (checkOverload)
{
- testAndFailCheck.accept("overload count values don't match", overload, connection.overloadedCount());
- testAndFailCheck.accept("overload bytes values don't match", overloadBytes, connection.overloadedBytes());
+ testAndFailCheck.accept("overload count values don't match", overload, connection::overloadedCount);
+ testAndFailCheck.accept("overload bytes values don't match", overloadBytes, connection::overloadedBytes);
}
if (checkExpired)
{
- testAndFailCheck.accept("expired count values don't match", expired, connection.expiredCount());
- testAndFailCheck.accept("expired bytes values don't match", expiredBytes, connection.expiredBytes());
+ testAndFailCheck.accept("expired count values don't match", expired, connection::expiredCount);
+ testAndFailCheck.accept("expired bytes values don't match", expiredBytes, connection::expiredBytes);
}
if (checkError)
{
- testAndFailCheck.accept("error count values don't match", error, connection.errorCount());
- testAndFailCheck.accept("error bytes values don't match", errorBytes, connection.errorBytes());
+ testAndFailCheck.accept("error count values don't match", error, connection::errorCount);
+ testAndFailCheck.accept("error bytes values don't match", errorBytes, connection::errorBytes);
}
}
}
@@ -197,45 +195,40 @@ public class ConnectionUtils
public void check()
{
- doCheck(Assert::assertEquals);
+ doCheck((message, expected, actual) -> spinAssertEquals(message, expected, actual, 5, TimeUnit.SECONDS));
}
public void check(FailCheck failCheck)
{
- doCheck((message, expect, actual) -> { if (expect != actual) failCheck.accept(message, expect, actual); });
+ doCheck((message, expect, actual) -> { if (!Objects.equals(expect, actual.get())) failCheck.accept(message, expect, actual); });
}
private void doCheck(FailCheck testAndFailCheck)
{
if (checkReceived)
{
- testAndFailCheck.accept("received count values don't match", received, connection.receivedCount());
- testAndFailCheck.accept("received bytes values don't match", receivedBytes, connection.receivedBytes());
+ testAndFailCheck.accept("received count values don't match", received, connection::receivedCount);
+ testAndFailCheck.accept("received bytes values don't match", receivedBytes, connection::receivedBytes);
}
if (checkProcessed)
{
- testAndFailCheck.accept("processed count values don't match", processed, connection.processedCount());
- testAndFailCheck.accept("processed bytes values don't match", processedBytes, connection.processedBytes());
+ testAndFailCheck.accept("processed count values don't match", processed, connection::processedCount);
+ testAndFailCheck.accept("processed bytes values don't match", processedBytes, connection::processedBytes);
}
if (checkExpired)
{
- testAndFailCheck.accept("expired count values don't match", expired, connection.expiredCount());
- testAndFailCheck.accept("expired bytes values don't match", expiredBytes, connection.expiredBytes());
+ testAndFailCheck.accept("expired count values don't match", expired, connection::expiredCount);
+ testAndFailCheck.accept("expired bytes values don't match", expiredBytes, connection::expiredBytes);
}
if (checkError)
{
- testAndFailCheck.accept("error count values don't match", error, connection.errorCount());
- testAndFailCheck.accept("error bytes values don't match", errorBytes, connection.errorBytes());
+ testAndFailCheck.accept("error count values don't match", error, connection::errorCount);
+ testAndFailCheck.accept("error bytes values don't match", errorBytes, connection::errorBytes);
}
if (checkScheduled)
{
- // scheduled cannot relied upon to not race with completion of the task,
- // so if it is currently above the value we expect, sleep for a bit
- if (scheduled < connection.scheduledCount())
- for (int i = 0; i < 10 && scheduled < connection.scheduledCount() ; ++i)
- Uninterruptibles.sleepUninterruptibly(1L, TimeUnit.MILLISECONDS);
- testAndFailCheck.accept("scheduled count values don't match", scheduled, connection.scheduledCount());
- testAndFailCheck.accept("scheduled bytes values don't match", scheduledBytes, connection.scheduledBytes());
+ testAndFailCheck.accept("scheduled count values don't match", scheduled, connection::scheduledCount);
+ testAndFailCheck.accept("scheduled bytes values don't match", scheduledBytes, connection::scheduledBytes);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org