You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/09/06 06:07:56 UTC

git commit: include message initiation time to replicas patch by jbellis; reviewed by vijay for CASSANDRA-2858

Updated Branches:
  refs/heads/trunk 978d7bb7d -> 0e3a9a55e


include message initiation time to replicas
patch by jbellis; reviewed by vijay for CASSANDRA-2858


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0e3a9a55
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0e3a9a55
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0e3a9a55

Branch: refs/heads/trunk
Commit: 0e3a9a55e484707addf32f73d18014df61c8d8f9
Parents: 978d7bb
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Sep 3 10:49:49 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Sep 5 23:07:45 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    2 +
 NEWS.txt                                           |    4 ++
 .../cassandra/net/IncomingTcpConnection.java       |    8 +++-
 .../apache/cassandra/net/MessageDeliveryTask.java  |    5 ++-
 .../org/apache/cassandra/net/MessagingService.java |    4 +-
 .../cassandra/net/OutboundTcpConnection.java       |   25 ++++++++-------
 .../cassandra/streaming/StreamInSession.java       |    2 +
 7 files changed, 32 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3a9a55/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1ed7941..b08117d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 1.2-dev
+ * include message initiation time to replicas so they can more
+   accurately drop timed-out requests (CASSANDRA-2858)
  * fix clientutil.jar dependencies (CASSANDRA-4566)
  * optimize WriteResponse (CASSANDRA-4548)
  * new metrics (CASSANDRA-4009)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3a9a55/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 731040a..3c9ead7 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -19,6 +19,10 @@ Upgrading
       the ability to read data files from Cassandra versions at least
       back to 0.6, so a non-rolling upgrade remains possible with just
       one step.
+    - Server clock synchronization is more important in 1.2; replicas
+      will use a coordinator-provided timestamp to determine when a
+      request has timed out and is thus not worth proceeding with.
+      Using a service like NTP is strongly recommended.
     - The hints schema was changed from 1.1 to 1.2. Cassandra automatically
       snapshots and then truncates the hints column family as part of
       starting up 1.2 for the first time.  Additionally, upgraded nodes

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3a9a55/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index 1b0ae05..5bf4c5d 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -174,10 +174,14 @@ public class IncomingTcpConnection extends Thread
 
     private InetAddress receiveMessage(DataInputStream input, int version) throws IOException
     {
-        if (version <= MessagingService.VERSION_11)
+        if (version < MessagingService.VERSION_12)
             input.readInt(); // size of entire message. in 1.0+ this is just a placeholder
 
         String id = input.readUTF();
+        long timestamp = version >= MessagingService.VERSION_12
+                       ? (System.currentTimeMillis() | 0x00000000FFFFFFFFL) & input.readInt()
+                       : System.currentTimeMillis();
+
         MessageIn message = MessageIn.read(input, version, id);
         if (message == null)
         {
@@ -186,7 +190,7 @@ public class IncomingTcpConnection extends Thread
         }
         if (version <= MessagingService.current_version)
         {
-            MessagingService.instance().receive(message, id);
+            MessagingService.instance().receive(message, id, timestamp);
         }
         else
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3a9a55/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index 02a9b1c..e6abdda 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -25,14 +25,15 @@ public class MessageDeliveryTask implements Runnable
     private static final Logger logger = LoggerFactory.getLogger(MessageDeliveryTask.class);
 
     private final MessageIn message;
-    private final long constructionTime = System.currentTimeMillis();
+    private final long constructionTime;
     private final String id;
 
-    public MessageDeliveryTask(MessageIn message, String id)
+    public MessageDeliveryTask(MessageIn message, String id, long timestamp)
     {
         assert message != null;
         this.message = message;
         this.id = id;
+        constructionTime = timestamp;
     }
 
     public void run()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3a9a55/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 0090f87..258e712 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -655,7 +655,7 @@ public final class MessagingService implements MessagingServiceMBean
         }
     }
 
-    public void receive(MessageIn message, String id)
+    public void receive(MessageIn message, String id, long timestamp)
     {
         Tracing.instance().initializeFromMessage(message);
 
@@ -666,7 +666,7 @@ public final class MessagingService implements MessagingServiceMBean
         if (message == null)
             return;
 
-        Runnable runnable = new MessageDeliveryTask(message, id);
+        Runnable runnable = new MessageDeliveryTask(message, id, timestamp);
         ExecutorService stage = StageManager.getStage(message.getMessageType());
         assert stage != null : "No stage for message type " + message.verb;
         stage.execute(runnable);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3a9a55/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index c0c1bf2..e52b4cc 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -123,7 +123,6 @@ public class OutboundTcpConnection extends Thread
             }
 
             MessageOut<?> m = qm.message;
-            String id = qm.id;
             if (m == CLOSE_SENTINEL)
             {
                 disconnect();
@@ -132,7 +131,7 @@ public class OutboundTcpConnection extends Thread
             if (qm.timestamp < System.currentTimeMillis() - m.getTimeout())
                 dropped.incrementAndGet();
             else if (socket != null || connect())
-                writeConnected(m, id);
+                writeConnected(qm);
             else
                 // clear out the queue, else gossip messages back up.
                 active.clear();
@@ -161,11 +160,11 @@ public class OutboundTcpConnection extends Thread
                || (DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.dc && !isLocalDC(poolReference.endPoint()));
     }
 
-    private void writeConnected(MessageOut<?> message, String id)
+    private void writeConnected(QueuedMessage qm)
     {
         try
         {
-            write(message, id, out);
+            write(qm.message, qm.id, qm.timestamp, out, targetVersion);
             completed++;
             if (active.peek() == null)
             {
@@ -183,21 +182,23 @@ public class OutboundTcpConnection extends Thread
         }
     }
 
-    public void write(MessageOut<?> message, String id, DataOutputStream out) throws IOException
-    {
-        write(message, id, out, targetVersion);
-    }
-
-    public static void write(MessageOut message, String id, DataOutputStream out, int version) throws IOException
+    public static void write(MessageOut message, String id, long timestamp, DataOutputStream out, int version) throws IOException
     {
         out.writeInt(MessagingService.PROTOCOL_MAGIC);
         if (version < MessagingService.VERSION_12)
+        {
             writeHeader(out, version, false);
-        // 0.8 included a total message size int.  1.0 doesn't need it but expects it to be there.
-        if (version <  MessagingService.VERSION_12)
+            // 0.8 included a total message size int.  1.0 doesn't need it but expects it to be there.
             out.writeInt(-1);
+        }
 
         out.writeUTF(id);
+        if (version >= MessagingService.VERSION_12)
+        {
+            // int cast cuts off the high-order half of the timestamp, which we can assume remains
+            // the same between now and when the recipient reconstructs it.
+            out.writeInt((int) timestamp);
+        }
         message.serialize(out, version);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3a9a55/src/java/org/apache/cassandra/streaming/StreamInSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java b/src/java/org/apache/cassandra/streaming/StreamInSession.java
index 47a8896..5753a15 100644
--- a/src/java/org/apache/cassandra/streaming/StreamInSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java
@@ -170,6 +170,7 @@ public class StreamInSession extends AbstractStreamSession
         DataOutputStream out = new DataOutputStream(socket.getOutputStream());
         OutboundTcpConnection.write(message,
                                     String.valueOf(getSessionId()),
+                                    System.currentTimeMillis(),
                                     out,
                                     MessagingService.instance().getVersion(getHost()));
         out.flush();
@@ -221,6 +222,7 @@ public class StreamInSession extends AbstractStreamSession
                 if (socket != null)
                     OutboundTcpConnection.write(reply.createMessage(),
                                                 context.right.toString(),
+                                                System.currentTimeMillis(),
                                                 new DataOutputStream(socket.getOutputStream()),
                                                 MessagingService.instance().getVersion(getHost()));
                 else