You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2021/09/30 09:27:19 UTC
[cassandra] 03/03: Correct the internode message timestamp if
sending node has wrapped
This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 84ec1dc97d6358bd569d5467cb150abd0fc8939b
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Mon Sep 27 10:31:12 2021 +0200
Correct the internode message timestamp if sending node has wrapped
Patch by marcuse; reviewed by Jon Meredith for CASSANDRA-16997
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/net/Message.java | 23 +++++++++++++++++++++-
.../org/apache/cassandra/utils/MonotonicClock.java | 7 +++++--
.../unit/org/apache/cassandra/net/MessageTest.java | 15 +++++++++++++-
.../apache/cassandra/utils/FreeRunningClock.java | 13 ++++++++++--
5 files changed, 53 insertions(+), 6 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 0c0ba4f..e613c80 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0.2
+ * Correct the internode message timestamp if sending node has wrapped (CASSANDRA-16997)
* Avoid race causing us to return null in RangesAtEndpoint (CASSANDRA-16965)
* Avoid rewriting all sstables during cleanup when transient replication is enabled (CASSANDRA-16966)
* Prevent CQLSH from failure on Python 3.10 (CASSANDRA-16987)
diff --git a/src/java/org/apache/cassandra/net/Message.java b/src/java/org/apache/cassandra/net/Message.java
index ca74012..2640d8f 100644
--- a/src/java/org/apache/cassandra/net/Message.java
+++ b/src/java/org/apache/cassandra/net/Message.java
@@ -30,6 +30,9 @@ import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Ints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.io.IVersionedAsymmetricSerializer;
@@ -42,6 +45,7 @@ import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.tracing.Tracing.TraceType;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MonotonicClockTranslation;
+import org.apache.cassandra.utils.NoSpamLogger;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -65,6 +69,9 @@ import static org.apache.cassandra.utils.vint.VIntCoding.skipUnsignedVInt;
*/
public class Message<T>
{
+ private static final Logger logger = LoggerFactory.getLogger(Message.class);
+ private static final NoSpamLogger noSpam1m = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
+
public final Header header;
public final T payload;
@@ -1046,7 +1053,8 @@ public class Message<T>
private static final long TIMESTAMP_WRAPAROUND_GRACE_PERIOD_START = 0xFFFFFFFFL - MINUTES.toMillis(15L);
private static final long TIMESTAMP_WRAPAROUND_GRACE_PERIOD_END = MINUTES.toMillis(15L);
- private static long calculateCreationTimeNanos(int messageTimestampMillis, MonotonicClockTranslation timeSnapshot, long currentTimeNanos)
+ @VisibleForTesting
+ static long calculateCreationTimeNanos(int messageTimestampMillis, MonotonicClockTranslation timeSnapshot, long currentTimeNanos)
{
long currentTimeMillis = timeSnapshot.toMillisSinceEpoch(currentTimeNanos);
// Reconstruct the message construction time sent by the remote host (we sent only the lower 4 bytes, assuming the
@@ -1064,8 +1072,21 @@ public class Message<T>
{
highBits -= 0x0000000100000000L;
}
+ // if the message timestamp wrapped, but we still haven't, add one highBit
+ else if (sentLowBits < TIMESTAMP_WRAPAROUND_GRACE_PERIOD_END
+ && currentLowBits > TIMESTAMP_WRAPAROUND_GRACE_PERIOD_START)
+ {
+ highBits += 0x0000000100000000L;
+ }
long sentTimeMillis = (highBits | sentLowBits);
+
+ if (Math.abs(currentTimeMillis - sentTimeMillis) > MINUTES.toMillis(15))
+ {
+ noSpam1m.warn("Bad timestamp {} generated, overriding with currentTimeMillis = {}", sentTimeMillis, currentTimeMillis);
+ sentTimeMillis = currentTimeMillis;
+ }
+
return timeSnapshot.fromMillisSinceEpoch(sentTimeMillis);
}
diff --git a/src/java/org/apache/cassandra/utils/MonotonicClock.java b/src/java/org/apache/cassandra/utils/MonotonicClock.java
index 5a1aa3c..bd69bd5 100644
--- a/src/java/org/apache/cassandra/utils/MonotonicClock.java
+++ b/src/java/org/apache/cassandra/utils/MonotonicClock.java
@@ -22,6 +22,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
+import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -135,13 +136,15 @@ public interface MonotonicClock
private static final String UPDATE_INTERVAL_PROPERTY = Config.PROPERTY_PREFIX + "NANOTIMETOMILLIS_TIMESTAMP_UPDATE_INTERVAL";
private static final long UPDATE_INTERVAL_MS = Long.getLong(UPDATE_INTERVAL_PROPERTY, 10000);
- private static class AlmostSameTime implements MonotonicClockTranslation
+ @VisibleForTesting
+ static class AlmostSameTime implements MonotonicClockTranslation
{
final long millisSinceEpoch;
final long monotonicNanos;
final long error; // maximum error of millis measurement (in nanos)
- private AlmostSameTime(long millisSinceEpoch, long monotonicNanos, long errorNanos)
+ @VisibleForTesting
+ AlmostSameTime(long millisSinceEpoch, long monotonicNanos, long errorNanos)
{
this.millisSinceEpoch = millisSinceEpoch;
this.monotonicNanos = monotonicNanos;
diff --git a/test/unit/org/apache/cassandra/net/MessageTest.java b/test/unit/org/apache/cassandra/net/MessageTest.java
index f32219c..a0deadf 100644
--- a/test/unit/org/apache/cassandra/net/MessageTest.java
+++ b/test/unit/org/apache/cassandra/net/MessageTest.java
@@ -38,7 +38,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.tracing.Tracing.TraceType;
import org.apache.cassandra.utils.FBUtilities;
-import org.assertj.core.api.Assertions;
+import org.apache.cassandra.utils.FreeRunningClock;
import static org.apache.cassandra.net.Message.serializer;
import static org.apache.cassandra.net.MessagingService.VERSION_3014;
@@ -282,4 +282,17 @@ public class MessageTest
else
assertEquals(payload1, payload2);
}
+
+ @Test
+ public void testCreationTime()
+ {
+ long remoteTime = 1632087572480L; // 10111110000000000000000000000000000000000
+ long localTime = 1632087572479L; // 10111101111111111111111111111111111111111
+ FreeRunningClock localClock = new FreeRunningClock(TimeUnit.DAYS.toNanos(1), localTime, 0);
+
+ int remoteCreatedAt = (int) (remoteTime & 0x00000000FFFFFFFFL);
+
+ long localTimeNanos = localClock.now();
+ assertTrue( Message.Serializer.calculateCreationTimeNanos(remoteCreatedAt, localClock.translate(), localTimeNanos) > 0);
+ }
}
diff --git a/test/unit/org/apache/cassandra/utils/FreeRunningClock.java b/test/unit/org/apache/cassandra/utils/FreeRunningClock.java
index 4d8a5f6..1947f57 100644
--- a/test/unit/org/apache/cassandra/utils/FreeRunningClock.java
+++ b/test/unit/org/apache/cassandra/utils/FreeRunningClock.java
@@ -26,6 +26,8 @@ import java.util.concurrent.TimeUnit;
public class FreeRunningClock implements MonotonicClock
{
private long nanoTime;
+ private long millisSinceEpoch;
+ private long error;
public FreeRunningClock()
{
@@ -37,6 +39,13 @@ public class FreeRunningClock implements MonotonicClock
this.nanoTime = nanoTime;
}
+ public FreeRunningClock(long nanoTime, long millisSinceEpoch, long error)
+ {
+ this.nanoTime = nanoTime;
+ this.millisSinceEpoch = millisSinceEpoch;
+ this.error = error;
+ }
+
@Override
public long now()
{
@@ -46,13 +55,13 @@ public class FreeRunningClock implements MonotonicClock
@Override
public long error()
{
- return 0;
+ return error;
}
@Override
public MonotonicClockTranslation translate()
{
- throw new UnsupportedOperationException();
+ return new AbstractEpochSamplingClock.AlmostSameTime(millisSinceEpoch, nanoTime, error);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org