You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/07/31 05:55:45 UTC

svn commit: r1152541 - in /cassandra/trunk: ./ conf/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/net/

Author: jbellis
Date: Sun Jul 31 03:55:44 2011
New Revision: 1152541

URL: http://svn.apache.org/viewvc?rev=1152541&view=rev
Log:
split rpc_timeout into read,write, scan, and "other" timeouts
patch by Melvin Wang; reviewed by jbellis for CASSANDRA-2819

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/conf/cassandra.yaml
    cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1152541&r1=1152540&r2=1152541&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Sun Jul 31 03:55:44 2011
@@ -24,6 +24,8 @@
    (CASSANDRA-2953)
  * fix potential use of free'd native memory in SerializingCache 
    (CASSANDRA-1951)
+ * split rpc_timeout into read, write, scan, and "other" timeouts
+   (CASSANDRA-2819)
 
 
 0.8.3

Modified: cassandra/trunk/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1152541&r1=1152540&r2=1152541&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Sun Jul 31 03:55:44 2011
@@ -304,9 +304,20 @@ compaction_throughput_mb_per_sec: 16
 # key caches.
 compaction_preheat_key_cache: true
 
-# Time to wait for a reply from other nodes before failing the command 
+# The default timeout for callbacks. Different verb may have different timeouts
+# as below.
 rpc_timeout_in_ms: 10000
 
+# Time to process read on the server node before failing the command 
+read_rpc_timeout_in_ms: 10000
+
+# Time to process range_slice on the server node before failing the command 
+range_rpc_timeout_in_ms: 10000
+
+# Time to process mutation/binary/read_repair on the server node before failing
+# the command 
+write_rpc_timeout_in_ms: 10000
+
 # phi value that must be reached for a host to be marked down.
 # most users should never need to adjust this.
 # phi_convict_threshold: 8

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1152541&r1=1152540&r2=1152541&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Sun Jul 31 03:55:44 2011
@@ -48,7 +48,13 @@ public class Config
     /* initial token in the ring */
     public String initial_token;
     
-    public Long rpc_timeout_in_ms = new Long(2000);
+    public Long rpc_timeout_in_ms = new Long(1000);
+
+    public Long read_rpc_timeout_in_ms = new Long(1000);
+
+    public Long range_rpc_timeout_in_ms = new Long(1000);
+    
+    public Long write_rpc_timeout_in_ms = new Long(1000);
 
     public Integer phi_convict_threshold = 8;
     

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1152541&r1=1152540&r2=1152541&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Sun Jul 31 03:55:44 2011
@@ -710,6 +710,21 @@ public class DatabaseDescriptor
         return conf.rpc_timeout_in_ms;
     }
 
+    public static long getReadRpcTimeout()
+    {
+        return conf.read_rpc_timeout_in_ms;
+    }
+
+    public static long getRangeRpcTimeout()
+    {
+        return conf.range_rpc_timeout_in_ms;
+    }
+
+    public static long getWriteRpcTimeout()
+    {
+        return conf.write_rpc_timeout_in_ms;
+    }
+
     public static int getPhiConvictThreshold()
     {
         return conf.phi_convict_threshold;

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=1152541&r1=1152540&r2=1152541&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Sun Jul 31 03:55:44 2011
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 
 import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
@@ -127,7 +128,25 @@ public class Message
         	.append(separator);
         return sbuf.toString();
     }
-    
+
+    public long getMessageTimeout()
+    {
+        StorageService.Verb verb = getVerb();
+        switch (verb)
+        {
+            case READ:
+                return DatabaseDescriptor.getReadRpcTimeout();
+            case RANGE_SLICE:
+                return DatabaseDescriptor.getRangeRpcTimeout();
+            case READ_REPAIR:
+            case BINARY:
+            case MUTATION:
+                return DatabaseDescriptor.getWriteRpcTimeout();
+            default:
+                return DatabaseDescriptor.getRpcTimeout();
+        }
+    }
+
     private static class MessageSerializer implements ICompactSerializer<Message>
     {
         public void serialize(Message t, DataOutputStream dos, int version) throws IOException

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java?rev=1152541&r1=1152540&r2=1152541&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java Sun Jul 31 03:55:44 2011
@@ -43,7 +43,7 @@ public class MessageDeliveryTask impleme
     { 
         StorageService.Verb verb = message.getVerb();
         if (MessagingService.DROPPABLE_VERBS.contains(verb)
-            && System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getRpcTimeout())
+            && System.currentTimeMillis() > constructionTime + message.getMessageTimeout())
         {
             MessagingService.instance().incrementDroppedMessages(verb);
             return;

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1152541&r1=1152540&r2=1152541&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Sun Jul 31 03:55:44 2011
@@ -297,14 +297,6 @@ public final class MessagingService impl
         return Integer.toString(idGen.incrementAndGet());
     }
 
-    /*
-     * @see #sendRR(Message message, InetAddress to, IMessageCallback cb, long timeout)
-     */
-    public String sendRR(Message message, InetAddress to, IMessageCallback cb)
-    {
-        return sendRR(message, to, cb, DEFAULT_CALLBACK_TIMEOUT);
-    }
-
     /**
      * Send a message to a given endpoint. This method specifies a callback
      * which is invoked with the actual response.
@@ -313,13 +305,12 @@ public final class MessagingService impl
      * @param cb callback interface which is used to pass the responses or
      *           suggest that a timeout occurred to the invoker of the send().
      *           suggest that a timeout occurred to the invoker of the send().
-     * @param timeout the timeout used for expiration
      * @return an reference to message id used to match with the result
      */
-    public String sendRR(Message message, InetAddress to, IMessageCallback cb, long timeout)
+    public String sendRR(Message message, InetAddress to, IMessageCallback cb)
     {
         String id = nextId();
-        addCallback(cb, id, to, timeout);
+        addCallback(cb, id, to, message.getMessageTimeout());
         sendOneWay(message, id, to);
         return id;
     }