You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/09/06 06:07:56 UTC
git commit: include message initiation time to replicas patch by
jbellis; reviewed by vijay for CASSANDRA-2858
Updated Branches:
refs/heads/trunk 978d7bb7d -> 0e3a9a55e
include message initiation time to replicas
patch by jbellis; reviewed by vijay for CASSANDRA-2858
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0e3a9a55
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0e3a9a55
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0e3a9a55
Branch: refs/heads/trunk
Commit: 0e3a9a55e484707addf32f73d18014df61c8d8f9
Parents: 978d7bb
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Sep 3 10:49:49 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Sep 5 23:07:45 2012 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +
NEWS.txt | 4 ++
.../cassandra/net/IncomingTcpConnection.java | 8 +++-
.../apache/cassandra/net/MessageDeliveryTask.java | 5 ++-
.../org/apache/cassandra/net/MessagingService.java | 4 +-
.../cassandra/net/OutboundTcpConnection.java | 25 ++++++++-------
.../cassandra/streaming/StreamInSession.java | 2 +
7 files changed, 32 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3a9a55/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1ed7941..b08117d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
1.2-dev
+ * include message initiation time to replicas so they can more
+ accurately drop timed-out requests (CASSANDRA-2858)
* fix clientutil.jar dependencies (CASSANDRA-4566)
* optimize WriteResponse (CASSANDRA-4548)
* new metrics (CASSANDRA-4009)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3a9a55/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 731040a..3c9ead7 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -19,6 +19,10 @@ Upgrading
the ability to read data files from Cassandra versions at least
back to 0.6, so a non-rolling upgrade remains possible with just
one step.
+ - Server clock synchronization is more important in 1.2; replicas
+ will use a coordinator-provided timestamp to determine when a
+ request has timed out and is thus not worth proceeding with.
+ Using a service like NTP is strongly recommended.
- The hints schema was changed from 1.1 to 1.2. Cassandra automatically
snapshots and then truncates the hints column family as part of
starting up 1.2 for the first time. Additionally, upgraded nodes
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3a9a55/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index 1b0ae05..5bf4c5d 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -174,10 +174,14 @@ public class IncomingTcpConnection extends Thread
private InetAddress receiveMessage(DataInputStream input, int version) throws IOException
{
- if (version <= MessagingService.VERSION_11)
+ if (version < MessagingService.VERSION_12)
input.readInt(); // size of entire message. in 1.0+ this is just a placeholder
String id = input.readUTF();
+ long timestamp = version >= MessagingService.VERSION_12
+ ? (System.currentTimeMillis() | 0x00000000FFFFFFFFL) & input.readInt()
+ : System.currentTimeMillis();
+
MessageIn message = MessageIn.read(input, version, id);
if (message == null)
{
@@ -186,7 +190,7 @@ public class IncomingTcpConnection extends Thread
}
if (version <= MessagingService.current_version)
{
- MessagingService.instance().receive(message, id);
+ MessagingService.instance().receive(message, id, timestamp);
}
else
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3a9a55/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index 02a9b1c..e6abdda 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -25,14 +25,15 @@ public class MessageDeliveryTask implements Runnable
private static final Logger logger = LoggerFactory.getLogger(MessageDeliveryTask.class);
private final MessageIn message;
- private final long constructionTime = System.currentTimeMillis();
+ private final long constructionTime;
private final String id;
- public MessageDeliveryTask(MessageIn message, String id)
+ public MessageDeliveryTask(MessageIn message, String id, long timestamp)
{
assert message != null;
this.message = message;
this.id = id;
+ constructionTime = timestamp;
}
public void run()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3a9a55/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 0090f87..258e712 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -655,7 +655,7 @@ public final class MessagingService implements MessagingServiceMBean
}
}
- public void receive(MessageIn message, String id)
+ public void receive(MessageIn message, String id, long timestamp)
{
Tracing.instance().initializeFromMessage(message);
@@ -666,7 +666,7 @@ public final class MessagingService implements MessagingServiceMBean
if (message == null)
return;
- Runnable runnable = new MessageDeliveryTask(message, id);
+ Runnable runnable = new MessageDeliveryTask(message, id, timestamp);
ExecutorService stage = StageManager.getStage(message.getMessageType());
assert stage != null : "No stage for message type " + message.verb;
stage.execute(runnable);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3a9a55/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index c0c1bf2..e52b4cc 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -123,7 +123,6 @@ public class OutboundTcpConnection extends Thread
}
MessageOut<?> m = qm.message;
- String id = qm.id;
if (m == CLOSE_SENTINEL)
{
disconnect();
@@ -132,7 +131,7 @@ public class OutboundTcpConnection extends Thread
if (qm.timestamp < System.currentTimeMillis() - m.getTimeout())
dropped.incrementAndGet();
else if (socket != null || connect())
- writeConnected(m, id);
+ writeConnected(qm);
else
// clear out the queue, else gossip messages back up.
active.clear();
@@ -161,11 +160,11 @@ public class OutboundTcpConnection extends Thread
|| (DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.dc && !isLocalDC(poolReference.endPoint()));
}
- private void writeConnected(MessageOut<?> message, String id)
+ private void writeConnected(QueuedMessage qm)
{
try
{
- write(message, id, out);
+ write(qm.message, qm.id, qm.timestamp, out, targetVersion);
completed++;
if (active.peek() == null)
{
@@ -183,21 +182,23 @@ public class OutboundTcpConnection extends Thread
}
}
- public void write(MessageOut<?> message, String id, DataOutputStream out) throws IOException
- {
- write(message, id, out, targetVersion);
- }
-
- public static void write(MessageOut message, String id, DataOutputStream out, int version) throws IOException
+ public static void write(MessageOut message, String id, long timestamp, DataOutputStream out, int version) throws IOException
{
out.writeInt(MessagingService.PROTOCOL_MAGIC);
if (version < MessagingService.VERSION_12)
+ {
writeHeader(out, version, false);
- // 0.8 included a total message size int. 1.0 doesn't need it but expects it to be there.
- if (version < MessagingService.VERSION_12)
+ // 0.8 included a total message size int. 1.0 doesn't need it but expects it to be there.
out.writeInt(-1);
+ }
out.writeUTF(id);
+ if (version >= MessagingService.VERSION_12)
+ {
+ // int cast cuts off the high-order half of the timestamp, which we can assume remains
+ // the same between now and when the recipient reconstructs it.
+ out.writeInt((int) timestamp);
+ }
message.serialize(out, version);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3a9a55/src/java/org/apache/cassandra/streaming/StreamInSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java b/src/java/org/apache/cassandra/streaming/StreamInSession.java
index 47a8896..5753a15 100644
--- a/src/java/org/apache/cassandra/streaming/StreamInSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java
@@ -170,6 +170,7 @@ public class StreamInSession extends AbstractStreamSession
DataOutputStream out = new DataOutputStream(socket.getOutputStream());
OutboundTcpConnection.write(message,
String.valueOf(getSessionId()),
+ System.currentTimeMillis(),
out,
MessagingService.instance().getVersion(getHost()));
out.flush();
@@ -221,6 +222,7 @@ public class StreamInSession extends AbstractStreamSession
if (socket != null)
OutboundTcpConnection.write(reply.createMessage(),
context.right.toString(),
+ System.currentTimeMillis(),
new DataOutputStream(socket.getOutputStream()),
MessagingService.instance().getVersion(getHost()));
else