You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/06/21 20:19:33 UTC

git commit: split up rpc timeout by operation type patch by Melvin Wang and jbellis; reviewed by vijay for CASSANDRA-2819

Updated Branches:
  refs/heads/trunk ced78f3c4 -> e6610e469


split up rpc timeout by operation type
patch by Melvin Wang and jbellis; reviewed by vijay for CASSANDRA-2819


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e6610e46
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e6610e46
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e6610e46

Branch: refs/heads/trunk
Commit: e6610e4692800485501e316df3dab53a6e9e34b2
Parents: ced78f3
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed May 16 14:49:56 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Jun 21 13:19:02 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 NEWS.txt                                           |    5 +
 conf/cassandra.yaml                                |   13 +++-
 src/java/org/apache/cassandra/config/Config.java   |   10 ++-
 .../cassandra/config/DatabaseDescriptor.java       |   70 +++++++++++++++
 .../org/apache/cassandra/db/RangeSliceCommand.java |   10 ++-
 src/java/org/apache/cassandra/db/ReadCommand.java  |    6 ++
 .../org/apache/cassandra/dht/BootStrapper.java     |    2 +-
 .../apache/cassandra/net/MessageDeliveryTask.java  |    2 +-
 src/java/org/apache/cassandra/net/MessageIn.java   |    6 ++
 src/java/org/apache/cassandra/net/MessageOut.java  |    6 ++
 .../org/apache/cassandra/net/MessagingService.java |   17 +---
 .../cassandra/net/OutboundTcpConnection.java       |    4 +-
 .../service/AbstractWriteResponseHandler.java      |    3 +-
 .../org/apache/cassandra/service/IReadCommand.java |    1 +
 .../org/apache/cassandra/service/ReadCallback.java |    2 +-
 .../apache/cassandra/service/RepairCallback.java   |    2 +-
 .../org/apache/cassandra/service/StorageProxy.java |   29 ++++---
 .../cassandra/service/StorageProxyMBean.java       |    8 ++
 .../cassandra/service/TruncateResponseHandler.java |    2 +-
 .../apache/cassandra/thrift/CassandraServer.java   |   10 +-
 21 files changed, 167 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8da61ba..344c87b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2-dev
+ * split up rpc timeout by operation type (CASSANDRA-2819)
  * rewrite key cache save/load to use only sequential i/o (CASSANDRA-3762)
  * update MS protocol with a version handshake + broadcast address id
    (CASSANDRA-4311)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 6cad6c2..23814b1 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -36,6 +36,11 @@ Upgrading
     - Global option hinted_handoff_throttle_delay_in_ms has been removed.
       hinted_handoff_throttle_in_kb has been added instead.
 
+Features
+--------
+    - rpc_timeout has been split up to allow finer-grained control
+      on timeouts for different operation types
+
 
 1.1.1
 =====

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index cddd491..a584448 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -400,7 +400,18 @@ compaction_preheat_key_cache: true
 # When unset, the default is 400 Mbps or 50 MB/s.
 # stream_throughput_outbound_megabits_per_sec: 400
 
-# Time to wait for a reply from other nodes before failing the command 
+# How long the coordinator should wait for read operations to complete
+read_rpc_timeout_in_ms: 10000
+# How long the coordinator should wait for seq or index scans to complete
+range_rpc_timeout_in_ms: 10000
+# How long the coordinator should wait for writes to complete
+write_rpc_timeout_in_ms: 10000
+# How long the coordinator should wait for truncates to complete
+# (This can be much longer, because we need to flush all CFs
+# to make sure we can clear out anythink in the commitlog that could
+# cause truncated data to reappear.)
+truncate_timeout_in_ms: 300000
+# The default timeout for other, miscellaneous operations
 rpc_timeout_in_ms: 10000
 
 # Enable socket timeout for streaming operation.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index e1c9a42..1cda551 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -45,7 +45,15 @@ public class Config
     /* initial token in the ring */
     public String initial_token;
 
-    public Long rpc_timeout_in_ms = new Long(2000);
+    public Long rpc_timeout_in_ms = new Long(10000);
+
+    public Long read_rpc_timeout_in_ms = new Long(10000);
+
+    public Long range_rpc_timeout_in_ms = new Long(10000);
+
+    public Long write_rpc_timeout_in_ms = new Long(10000);
+
+    public Long truncate_rpc_timeout_in_ms = new Long(300000);
 
     public Integer streaming_socket_timeout_in_ms = new Integer(0);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index d94782c..12f5a4d 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -26,6 +26,7 @@ import java.net.URL;
 import java.net.UnknownHostException;
 import java.util.*;
 
+import com.google.common.primitives.Longs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,10 +46,12 @@ import org.apache.cassandra.locator.DynamicEndpointSnitch;
 import org.apache.cassandra.locator.EndpointSnitchInfo;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.SeedProvider;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.scheduler.IRequestScheduler;
 import org.apache.cassandra.scheduler.NoScheduler;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.CassandraDaemon;
 import org.apache.cassandra.utils.FBUtilities;
 import org.yaml.snakeyaml.Loader;
@@ -696,6 +699,73 @@ public class DatabaseDescriptor
         conf.rpc_timeout_in_ms = timeOutInMillis;
     }
 
+    public static long getReadRpcTimeout()
+    {
+        return conf.read_rpc_timeout_in_ms;
+    }
+
+    public static void setReadRpcTimeout(Long timeOutInMillis)
+    {
+        conf.read_rpc_timeout_in_ms = timeOutInMillis;
+    }
+
+    public static long getRangeRpcTimeout()
+    {
+        return conf.range_rpc_timeout_in_ms;
+    }
+
+    public static void setRangeRpcTimeout(Long timeOutInMillis)
+    {
+        conf.range_rpc_timeout_in_ms = timeOutInMillis;
+    }
+
+    public static long getWriteRpcTimeout()
+    {
+        return conf.write_rpc_timeout_in_ms;
+    }
+
+    public static void setWriteRpcTimeout(Long timeOutInMillis)
+    {
+        conf.write_rpc_timeout_in_ms = timeOutInMillis;
+    }
+
+    public static long getTruncateRpcTimeout()
+    {
+        return conf.truncate_rpc_timeout_in_ms;
+    }
+
+    public static void setTruncateRpcTimeout(Long timeOutInMillis)
+    {
+        conf.truncate_rpc_timeout_in_ms = timeOutInMillis;
+    }
+
+    // not part of the Verb enum so we can change timeouts easily via JMX
+    public static long getTimeout(MessagingService.Verb verb)
+    {
+        switch (verb)
+        {
+            case READ:
+                return getReadRpcTimeout();
+            case RANGE_SLICE:
+                return getRangeRpcTimeout();
+            case TRUNCATE:
+                return getTruncateRpcTimeout();
+            case READ_REPAIR:
+            case MUTATION:
+                return getWriteRpcTimeout();
+            default:
+                return getRpcTimeout();
+        }
+    }
+
+    /**
+     * @return the minimum configured {read, write, range, truncate, misc} timeout
+     */
+    public static long getMinRpcTimeout()
+    {
+        return Longs.min(getRpcTimeout(), getReadRpcTimeout(), getRangeRpcTimeout(), getWriteRpcTimeout(), getTruncateRpcTimeout());
+    }
+
     public static double getPhiConvictThreshold()
     {
         return conf.phi_convict_threshold;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/db/RangeSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
index c0054ea..5511127 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
@@ -43,8 +43,11 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.filter.IFilter;
+import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -168,6 +171,11 @@ public class RangeSliceCommand implements IReadCommand
         SlicePredicate pred = RangeSliceCommandSerializer.asSlicePredicate(predicate);
         return new IndexScanCommand(keyspace, column_family, clause, pred, range);
     }
+
+    public long getTimeout()
+    {
+        return DatabaseDescriptor.getRangeRpcTimeout();
+    }
 }
 
 class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceCommand>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 8749c46..74d8fba 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -99,6 +100,11 @@ public abstract class ReadCommand implements IReadCommand
     {
         // noop
     }
+
+    public long getTimeout()
+    {
+        return DatabaseDescriptor.getReadRpcTimeout();
+    }
 }
 
 class ReadCommandSerializer implements IVersionedSerializer<ReadCommand>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index 7b14586..8abc48d 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -155,7 +155,7 @@ public class BootStrapper
     {
         MessageOut message = new MessageOut(MessagingService.Verb.BOOTSTRAP_TOKEN);
         int retries = 5;
-        long timeout = Math.max(MessagingService.getDefaultCallbackTimeout(), BOOTSTRAP_TIMEOUT);
+        long timeout = Math.max(DatabaseDescriptor.getRpcTimeout(), BOOTSTRAP_TIMEOUT);
 
         while (retries > 0)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index a5fd614..dc50ef8 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -41,7 +41,7 @@ public class MessageDeliveryTask implements Runnable
     {
         MessagingService.Verb verb = message.verb;
         if (MessagingService.DROPPABLE_VERBS.contains(verb)
-            && System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getRpcTimeout())
+            && System.currentTimeMillis() > constructionTime + message.getTimeout())
         {
             MessagingService.instance().incrementDroppedMessages(verb);
             return;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/net/MessageIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java
index f3b712e..38e376f 100644
--- a/src/java/org/apache/cassandra/net/MessageIn.java
+++ b/src/java/org/apache/cassandra/net/MessageIn.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import com.google.common.collect.ImmutableMap;
 
 import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.FileUtils;
 
@@ -99,6 +100,11 @@ public class MessageIn<T>
         return MessagingService.verbStages.get(verb);
     }
 
+    public long getTimeout()
+    {
+        return DatabaseDescriptor.getTimeout(verb);
+    }
+
     public String toString()
     {
         StringBuilder sbuf = new StringBuilder("");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/net/MessageOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageOut.java b/src/java/org/apache/cassandra/net/MessageOut.java
index fe09c2e..78546c8 100644
--- a/src/java/org/apache/cassandra/net/MessageOut.java
+++ b/src/java/org/apache/cassandra/net/MessageOut.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import com.google.common.collect.ImmutableMap;
 
 import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.utils.FBUtilities;
@@ -87,6 +88,11 @@ public class MessageOut<T>
         return MessagingService.verbStages.get(verb);
     }
 
+    public long getTimeout()
+    {
+        return DatabaseDescriptor.getTimeout(verb);
+    }
+
     public String toString()
     {
         StringBuilder sbuf = new StringBuilder("");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index c0af377..18f3baa 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -285,7 +285,6 @@ public final class MessagingService implements MessagingServiceMBean
     private final Map<String, AtomicLong> timeoutsPerHost = new HashMap<String, AtomicLong>();
     private final Map<String, AtomicLong> recentTimeoutsPerHost = new HashMap<String, AtomicLong>();
     private final List<ILatencySubscriber> subscribers = new ArrayList<ILatencySubscriber>();
-    private static final long DEFAULT_CALLBACK_TIMEOUT = DatabaseDescriptor.getRpcTimeout();
 
     // protocol versions of the other nodes in the cluster
     private final ConcurrentMap<InetAddress, Integer> versions = new NonBlockingHashMap<InetAddress, Integer>();
@@ -324,7 +323,7 @@ public final class MessagingService implements MessagingServiceMBean
             public Object apply(Pair<String, CallbackInfo> pair)
             {
                 CallbackInfo expiredCallbackInfo = pair.right;
-                maybeAddLatency(expiredCallbackInfo.callback, expiredCallbackInfo.target, (double) DatabaseDescriptor.getRpcTimeout());
+                maybeAddLatency(expiredCallbackInfo.callback, expiredCallbackInfo.target, (double) expiredCallbackInfo.sentMessage.getTimeout());
                 totalTimeouts++;
                 String ip = expiredCallbackInfo.target.getHostAddress();
                 AtomicLong c = timeoutsPerHost.get(ip);
@@ -350,7 +349,7 @@ public final class MessagingService implements MessagingServiceMBean
             }
         };
 
-        callbacks = new ExpiringMap<String, CallbackInfo>(DEFAULT_CALLBACK_TIMEOUT, timeoutReporter);
+        callbacks = new ExpiringMap<String, CallbackInfo>(DatabaseDescriptor.getMinRpcTimeout(), timeoutReporter);
 
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try
@@ -488,11 +487,6 @@ public final class MessagingService implements MessagingServiceMBean
         return verbHandlers.get(type);
     }
 
-    public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to)
-    {
-        return addCallback(cb, message, to, DEFAULT_CALLBACK_TIMEOUT);
-    }
-
     public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to, long timeout)
     {
         String messageId = nextId();
@@ -520,7 +514,7 @@ public final class MessagingService implements MessagingServiceMBean
      */
     public String sendRR(MessageOut message, InetAddress to, IMessageCallback cb)
     {
-        return sendRR(message, to, cb, DEFAULT_CALLBACK_TIMEOUT);
+        return sendRR(message, to, cb, message.getTimeout());
     }
 
     /**
@@ -914,11 +908,6 @@ public final class MessagingService implements MessagingServiceMBean
         return completedTasks;
     }
 
-    public static long getDefaultCallbackTimeout()
-    {
-        return DEFAULT_CALLBACK_TIMEOUT;
-    }
-
     public Map<String, Integer> getDroppedMessages()
     {
         Map<String, Integer> map = new HashMap<String, Integer>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index a5d8181..bdf8223 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -131,7 +131,7 @@ public class OutboundTcpConnection extends Thread
                 disconnect();
                 continue;
             }
-            if (entry.timestamp < System.currentTimeMillis() - DatabaseDescriptor.getRpcTimeout())
+            if (entry.timestamp < System.currentTimeMillis() - m.getTimeout())
                 dropped.incrementAndGet();
             else if (socket != null || connect())
                 writeConnected(m, id);
@@ -312,7 +312,7 @@ public class OutboundTcpConnection extends Thread
         while (true)
         {
             Entry entry = backlog.peek();
-            if (entry == null || entry.timestamp >= System.currentTimeMillis() - DatabaseDescriptor.getRpcTimeout())
+            if (entry == null || entry.timestamp >= System.currentTimeMillis() - entry.message.getTimeout())
                 break;
 
             Entry entry2 = backlog.poll();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index a6c38ae..db85a47 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -44,7 +44,8 @@ public abstract class AbstractWriteResponseHandler implements IWriteResponseHand
 
     public void get() throws TimeoutException
     {
-        long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
+        long timeout = DatabaseDescriptor.getWriteRpcTimeout() - (System.currentTimeMillis() - startTime);
+
         boolean success;
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/service/IReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IReadCommand.java b/src/java/org/apache/cassandra/service/IReadCommand.java
index 743c80b..c6a129e 100644
--- a/src/java/org/apache/cassandra/service/IReadCommand.java
+++ b/src/java/org/apache/cassandra/service/IReadCommand.java
@@ -20,4 +20,5 @@ package org.apache.cassandra.service;
 public interface IReadCommand
 {
     public String getKeyspace();
+    public long getTimeout();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 6608b3e..a3d273c 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -128,7 +128,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
 
     public TResolved get() throws TimeoutException, DigestMismatchException, IOException
     {
-        long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
+        long timeout = command.getTimeout() - (System.currentTimeMillis() - startTime);
         boolean success;
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/service/RepairCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RepairCallback.java b/src/java/org/apache/cassandra/service/RepairCallback.java
index 97f3079..8ded3e0 100644
--- a/src/java/org/apache/cassandra/service/RepairCallback.java
+++ b/src/java/org/apache/cassandra/service/RepairCallback.java
@@ -55,7 +55,7 @@ public class RepairCallback implements IAsyncCallback
 
     public Row get() throws TimeoutException, DigestMismatchException, IOException
     {
-        long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
+        long timeout = DatabaseDescriptor.getWriteRpcTimeout() - (System.currentTimeMillis() - startTime);
         try
         {
             condition.await(timeout, TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 51f5fe2..92a3256 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -430,7 +430,7 @@ public class StorageProxy implements StorageProxyMBean
                 {
                     InetAddress destination = iter.next();
                     CompactEndpointSerializationHelper.serialize(destination, dos);
-                    String id = MessagingService.instance().addCallback(handler, message, destination);
+                    String id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout());
                     dos.writeUTF(id);
                     if (logger.isDebugEnabled())
                         logger.debug("Adding FWD message to: " + destination + " with ID " + id);
@@ -763,7 +763,7 @@ public class StorageProxy implements StorageProxyMBean
                     RepairCallback handler = repairResponseHandlers.get(i);
                     // wait for the repair writes to be acknowledged, to minimize impact on any replica that's
                     // behind on writes in case the out-of-sync row is read multiple times in quick succession
-                    FBUtilities.waitOnFutures(handler.resolver.repairResults, DatabaseDescriptor.getRpcTimeout());
+                    FBUtilities.waitOnFutures(handler.resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
 
                     Row row;
                     try
@@ -903,7 +903,7 @@ public class StorageProxy implements StorageProxyMBean
                             columnsCount += row.getLiveColumnCount();
                             logger.debug("range slices read {}", row.key);
                         }
-                        FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getRpcTimeout());
+                        FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
                     }
                     catch (TimeoutException ex)
                     {
@@ -1237,7 +1237,7 @@ public class StorageProxy implements StorageProxyMBean
 
         public final void run()
         {
-            if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getRpcTimeout())
+            if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getTimeout(verb))
             {
                 MessagingService.instance().incrementDroppedMessages(verb);
                 return;
@@ -1282,13 +1282,18 @@ public class StorageProxy implements StorageProxyMBean
             logger.warn("Some hints were not written before shutdown.  This is not supposed to happen.  You should (a) run repair, and (b) file a bug report");
     }
 
-    public Long getRpcTimeout()
-    {
-        return DatabaseDescriptor.getRpcTimeout();
-    }
+    public Long getRpcTimeout() { return DatabaseDescriptor.getRpcTimeout(); }
+    public void setRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setRpcTimeout(timeoutInMillis); }
 
-    public void setRpcTimeout(Long timeoutInMillis)
-    {
-        DatabaseDescriptor.setRpcTimeout(timeoutInMillis);
-    }
+    public Long getReadRpcTimeout() { return DatabaseDescriptor.getReadRpcTimeout(); }
+    public void setReadRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setReadRpcTimeout(timeoutInMillis); }
+
+    public Long getWriteRpcTimeout() { return DatabaseDescriptor.getWriteRpcTimeout(); }
+    public void setWriteRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setWriteRpcTimeout(timeoutInMillis); }
+
+    public Long getRangeRpcTimeout() { return DatabaseDescriptor.getRangeRpcTimeout(); }
+    public void setRangeRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setRangeRpcTimeout(timeoutInMillis); }
+
+    public Long getTruncateRpcTimeout() { return DatabaseDescriptor.getTruncateRpcTimeout(); }
+    public void setTruncateRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setTruncateRpcTimeout(timeoutInMillis); }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/service/StorageProxyMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index f1fb25d..dd1541a 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -48,4 +48,12 @@ public interface StorageProxyMBean
 
     public Long getRpcTimeout();
     public void setRpcTimeout(Long timeoutInMillis);
+    public Long getReadRpcTimeout();
+    public void setReadRpcTimeout(Long timeoutInMillis);
+    public Long getWriteRpcTimeout();
+    public void setWriteRpcTimeout(Long timeoutInMillis);
+    public Long getRangeRpcTimeout();
+    public void setRangeRpcTimeout(Long timeoutInMillis);
+    public Long getTruncateRpcTimeout();
+    public void setTruncateRpcTimeout(Long timeoutInMillis);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
index e5dc93d..e17211c 100644
--- a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
@@ -49,7 +49,7 @@ public class TruncateResponseHandler implements IAsyncCallback
 
     public void get() throws TimeoutException
     {
-        long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
+        long timeout = DatabaseDescriptor.getTruncateRpcTimeout() - (System.currentTimeMillis() - startTime);
         boolean success;
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 54bf927..72a4677 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -104,7 +104,7 @@ public class CassandraServer implements Cassandra.Iface
         List<Row> rows;
         try
         {
-            schedule(DatabaseDescriptor.getRpcTimeout());
+            schedule(DatabaseDescriptor.getReadRpcTimeout());
             try
             {
                 rows = StorageProxy.read(commands, consistency_level);
@@ -626,7 +626,7 @@ public class CassandraServer implements Cassandra.Iface
             return;
         try
         {
-            schedule(DatabaseDescriptor.getRpcTimeout());
+            schedule(DatabaseDescriptor.getWriteRpcTimeout());
             try
             {
                 StorageProxy.mutate(mutations, consistency_level);
@@ -685,7 +685,7 @@ public class CassandraServer implements Cassandra.Iface
             {
                 bounds = new Bounds<RowPosition>(RowPosition.forKey(range.start_key, p), RowPosition.forKey(range.end_key, p));
             }
-            schedule(DatabaseDescriptor.getRpcTimeout());
+            schedule(DatabaseDescriptor.getRangeRpcTimeout());
             try
             {
                 IFilter filter = ThriftValidation.asIFilter(predicate, metadata.getComparatorFor(column_parent.super_column));
@@ -745,7 +745,7 @@ public class CassandraServer implements Cassandra.Iface
         List<Row> rows;
         try
         {
-            schedule(DatabaseDescriptor.getRpcTimeout());
+            schedule(DatabaseDescriptor.getRangeRpcTimeout());
             try
             {
                 IFilter filter = ThriftValidation.asIFilter(predicate, metadata.comparator);
@@ -1091,7 +1091,7 @@ public class CassandraServer implements Cassandra.Iface
         cState.hasColumnFamilyAccess(cfname, Permission.WRITE);
         try
         {
-            schedule(DatabaseDescriptor.getRpcTimeout());
+            schedule(DatabaseDescriptor.getTruncateRpcTimeout());
             try
             {
                 StorageProxy.truncateBlocking(cState.getKeyspace(), cfname);