You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2021/09/30 09:27:19 UTC

[cassandra] 03/03: Correct the internode message timestamp if sending node has wrapped

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 84ec1dc97d6358bd569d5467cb150abd0fc8939b
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Mon Sep 27 10:31:12 2021 +0200

    Correct the internode message timestamp if sending node has wrapped
    
    Patch by marcuse; reviewed by Jon Meredith for CASSANDRA-16997
---
 CHANGES.txt                                        |  1 +
 src/java/org/apache/cassandra/net/Message.java     | 23 +++++++++++++++++++++-
 .../org/apache/cassandra/utils/MonotonicClock.java |  7 +++++--
 .../unit/org/apache/cassandra/net/MessageTest.java | 15 +++++++++++++-
 .../apache/cassandra/utils/FreeRunningClock.java   | 13 ++++++++++--
 5 files changed, 53 insertions(+), 6 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 0c0ba4f..e613c80 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0.2
+ * Correct the internode message timestamp if sending node has wrapped (CASSANDRA-16997)
  * Avoid race causing us to return null in RangesAtEndpoint (CASSANDRA-16965)
  * Avoid rewriting all sstables during cleanup when transient replication is enabled (CASSANDRA-16966)
  * Prevent CQLSH from failure on Python 3.10 (CASSANDRA-16987)
diff --git a/src/java/org/apache/cassandra/net/Message.java b/src/java/org/apache/cassandra/net/Message.java
index ca74012..2640d8f 100644
--- a/src/java/org/apache/cassandra/net/Message.java
+++ b/src/java/org/apache/cassandra/net/Message.java
@@ -30,6 +30,9 @@ import javax.annotation.Nullable;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Ints;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.io.IVersionedAsymmetricSerializer;
@@ -42,6 +45,7 @@ import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.tracing.Tracing.TraceType;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MonotonicClockTranslation;
+import org.apache.cassandra.utils.NoSpamLogger;
 
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -65,6 +69,9 @@ import static org.apache.cassandra.utils.vint.VIntCoding.skipUnsignedVInt;
  */
 public class Message<T>
 {
+    private static final Logger logger = LoggerFactory.getLogger(Message.class);
+    private static final NoSpamLogger noSpam1m = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
+
     public final Header header;
     public final T payload;
 
@@ -1046,7 +1053,8 @@ public class Message<T>
         private static final long TIMESTAMP_WRAPAROUND_GRACE_PERIOD_START  = 0xFFFFFFFFL - MINUTES.toMillis(15L);
         private static final long TIMESTAMP_WRAPAROUND_GRACE_PERIOD_END    =               MINUTES.toMillis(15L);
 
-        private static long calculateCreationTimeNanos(int messageTimestampMillis, MonotonicClockTranslation timeSnapshot, long currentTimeNanos)
+        @VisibleForTesting
+        static long calculateCreationTimeNanos(int messageTimestampMillis, MonotonicClockTranslation timeSnapshot, long currentTimeNanos)
         {
             long currentTimeMillis = timeSnapshot.toMillisSinceEpoch(currentTimeNanos);
             // Reconstruct the message construction time sent by the remote host (we sent only the lower 4 bytes, assuming the
@@ -1064,8 +1072,21 @@ public class Message<T>
             {
                 highBits -= 0x0000000100000000L;
             }
+            // if the message timestamp wrapped, but we still haven't, add one highBit
+            else if (sentLowBits < TIMESTAMP_WRAPAROUND_GRACE_PERIOD_END
+                     && currentLowBits > TIMESTAMP_WRAPAROUND_GRACE_PERIOD_START)
+            {
+                highBits += 0x0000000100000000L;
+            }
 
             long sentTimeMillis = (highBits | sentLowBits);
+
+            if (Math.abs(currentTimeMillis - sentTimeMillis) > MINUTES.toMillis(15))
+            {
+                noSpam1m.warn("Bad timestamp {} generated, overriding with currentTimeMillis = {}", sentTimeMillis, currentTimeMillis);
+                sentTimeMillis = currentTimeMillis;
+            }
+
             return timeSnapshot.fromMillisSinceEpoch(sentTimeMillis);
         }
 
diff --git a/src/java/org/apache/cassandra/utils/MonotonicClock.java b/src/java/org/apache/cassandra/utils/MonotonicClock.java
index 5a1aa3c..bd69bd5 100644
--- a/src/java/org/apache/cassandra/utils/MonotonicClock.java
+++ b/src/java/org/apache/cassandra/utils/MonotonicClock.java
@@ -22,6 +22,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.function.LongSupplier;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -135,13 +136,15 @@ public interface MonotonicClock
         private static final String UPDATE_INTERVAL_PROPERTY = Config.PROPERTY_PREFIX + "NANOTIMETOMILLIS_TIMESTAMP_UPDATE_INTERVAL";
         private static final long UPDATE_INTERVAL_MS = Long.getLong(UPDATE_INTERVAL_PROPERTY, 10000);
 
-        private static class AlmostSameTime implements MonotonicClockTranslation
+        @VisibleForTesting
+        static class AlmostSameTime implements MonotonicClockTranslation
         {
             final long millisSinceEpoch;
             final long monotonicNanos;
             final long error; // maximum error of millis measurement (in nanos)
 
-            private AlmostSameTime(long millisSinceEpoch, long monotonicNanos, long errorNanos)
+            @VisibleForTesting
+            AlmostSameTime(long millisSinceEpoch, long monotonicNanos, long errorNanos)
             {
                 this.millisSinceEpoch = millisSinceEpoch;
                 this.monotonicNanos = monotonicNanos;
diff --git a/test/unit/org/apache/cassandra/net/MessageTest.java b/test/unit/org/apache/cassandra/net/MessageTest.java
index f32219c..a0deadf 100644
--- a/test/unit/org/apache/cassandra/net/MessageTest.java
+++ b/test/unit/org/apache/cassandra/net/MessageTest.java
@@ -38,7 +38,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.tracing.Tracing.TraceType;
 import org.apache.cassandra.utils.FBUtilities;
-import org.assertj.core.api.Assertions;
+import org.apache.cassandra.utils.FreeRunningClock;
 
 import static org.apache.cassandra.net.Message.serializer;
 import static org.apache.cassandra.net.MessagingService.VERSION_3014;
@@ -282,4 +282,17 @@ public class MessageTest
         else
             assertEquals(payload1, payload2);
     }
+
+    @Test
+    public void testCreationTime()
+    {
+        long remoteTime = 1632087572480L; // 10111110000000000000000000000000000000000
+        long localTime  = 1632087572479L; // 10111101111111111111111111111111111111111
+        FreeRunningClock localClock  = new FreeRunningClock(TimeUnit.DAYS.toNanos(1), localTime, 0);
+
+        int remoteCreatedAt = (int) (remoteTime & 0x00000000FFFFFFFFL);
+
+        long localTimeNanos = localClock.now();
+        assertTrue( Message.Serializer.calculateCreationTimeNanos(remoteCreatedAt, localClock.translate(), localTimeNanos) > 0);
+    }
 }
diff --git a/test/unit/org/apache/cassandra/utils/FreeRunningClock.java b/test/unit/org/apache/cassandra/utils/FreeRunningClock.java
index 4d8a5f6..1947f57 100644
--- a/test/unit/org/apache/cassandra/utils/FreeRunningClock.java
+++ b/test/unit/org/apache/cassandra/utils/FreeRunningClock.java
@@ -26,6 +26,8 @@ import java.util.concurrent.TimeUnit;
 public class FreeRunningClock implements MonotonicClock
 {
     private long nanoTime;
+    private long millisSinceEpoch;
+    private long error;
 
     public FreeRunningClock()
     {
@@ -37,6 +39,13 @@ public class FreeRunningClock implements MonotonicClock
         this.nanoTime = nanoTime;
     }
 
+    public FreeRunningClock(long nanoTime, long millisSinceEpoch, long error)
+    {
+        this.nanoTime = nanoTime;
+        this.millisSinceEpoch = millisSinceEpoch;
+        this.error = error;
+    }
+
     @Override
     public long now()
     {
@@ -46,13 +55,13 @@ public class FreeRunningClock implements MonotonicClock
     @Override
     public long error()
     {
-        return 0;
+        return error;
     }
 
     @Override
     public MonotonicClockTranslation translate()
     {
-        throw new UnsupportedOperationException();
+        return new AbstractEpochSamplingClock.AlmostSameTime(millisSinceEpoch, nanoTime, error);
     }
 
     @Override

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org