You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/06/21 20:19:33 UTC
git commit: split up rpc timeout by operation type patch by Melvin
Wang and jbellis; reviewed by vijay for CASSANDRA-2819
Updated Branches:
refs/heads/trunk ced78f3c4 -> e6610e469
split up rpc timeout by operation type
patch by Melvin Wang and jbellis; reviewed by vijay for CASSANDRA-2819
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e6610e46
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e6610e46
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e6610e46
Branch: refs/heads/trunk
Commit: e6610e4692800485501e316df3dab53a6e9e34b2
Parents: ced78f3
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed May 16 14:49:56 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Jun 21 13:19:02 2012 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 5 +
conf/cassandra.yaml | 13 +++-
src/java/org/apache/cassandra/config/Config.java | 10 ++-
.../cassandra/config/DatabaseDescriptor.java | 70 +++++++++++++++
.../org/apache/cassandra/db/RangeSliceCommand.java | 10 ++-
src/java/org/apache/cassandra/db/ReadCommand.java | 6 ++
.../org/apache/cassandra/dht/BootStrapper.java | 2 +-
.../apache/cassandra/net/MessageDeliveryTask.java | 2 +-
src/java/org/apache/cassandra/net/MessageIn.java | 6 ++
src/java/org/apache/cassandra/net/MessageOut.java | 6 ++
.../org/apache/cassandra/net/MessagingService.java | 17 +---
.../cassandra/net/OutboundTcpConnection.java | 4 +-
.../service/AbstractWriteResponseHandler.java | 3 +-
.../org/apache/cassandra/service/IReadCommand.java | 1 +
.../org/apache/cassandra/service/ReadCallback.java | 2 +-
.../apache/cassandra/service/RepairCallback.java | 2 +-
.../org/apache/cassandra/service/StorageProxy.java | 29 ++++---
.../cassandra/service/StorageProxyMBean.java | 8 ++
.../cassandra/service/TruncateResponseHandler.java | 2 +-
.../apache/cassandra/thrift/CassandraServer.java | 10 +-
21 files changed, 167 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8da61ba..344c87b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.2-dev
+ * split up rpc timeout by operation type (CASSANDRA-2819)
* rewrite key cache save/load to use only sequential i/o (CASSANDRA-3762)
* update MS protocol with a version handshake + broadcast address id
(CASSANDRA-4311)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 6cad6c2..23814b1 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -36,6 +36,11 @@ Upgrading
- Global option hinted_handoff_throttle_delay_in_ms has been removed.
hinted_handoff_throttle_in_kb has been added instead.
+Features
+--------
+ - rpc_timeout has been split up to allow finer-grained control
+ on timeouts for different operation types
+
1.1.1
=====
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index cddd491..a584448 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -400,7 +400,18 @@ compaction_preheat_key_cache: true
# When unset, the default is 400 Mbps or 50 MB/s.
# stream_throughput_outbound_megabits_per_sec: 400
-# Time to wait for a reply from other nodes before failing the command
+# How long the coordinator should wait for read operations to complete
+read_rpc_timeout_in_ms: 10000
+# How long the coordinator should wait for seq or index scans to complete
+range_rpc_timeout_in_ms: 10000
+# How long the coordinator should wait for writes to complete
+write_rpc_timeout_in_ms: 10000
+# How long the coordinator should wait for truncates to complete
+# (This can be much longer, because we need to flush all CFs
+# to make sure we can clear out anythink in the commitlog that could
+# cause truncated data to reappear.)
+truncate_timeout_in_ms: 300000
+# The default timeout for other, miscellaneous operations
rpc_timeout_in_ms: 10000
# Enable socket timeout for streaming operation.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index e1c9a42..1cda551 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -45,7 +45,15 @@ public class Config
/* initial token in the ring */
public String initial_token;
- public Long rpc_timeout_in_ms = new Long(2000);
+ public Long rpc_timeout_in_ms = new Long(10000);
+
+ public Long read_rpc_timeout_in_ms = new Long(10000);
+
+ public Long range_rpc_timeout_in_ms = new Long(10000);
+
+ public Long write_rpc_timeout_in_ms = new Long(10000);
+
+ public Long truncate_rpc_timeout_in_ms = new Long(300000);
public Integer streaming_socket_timeout_in_ms = new Integer(0);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index d94782c..12f5a4d 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -26,6 +26,7 @@ import java.net.URL;
import java.net.UnknownHostException;
import java.util.*;
+import com.google.common.primitives.Longs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,10 +46,12 @@ import org.apache.cassandra.locator.DynamicEndpointSnitch;
import org.apache.cassandra.locator.EndpointSnitchInfo;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.SeedProvider;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.scheduler.IRequestScheduler;
import org.apache.cassandra.scheduler.NoScheduler;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.CassandraDaemon;
import org.apache.cassandra.utils.FBUtilities;
import org.yaml.snakeyaml.Loader;
@@ -696,6 +699,73 @@ public class DatabaseDescriptor
conf.rpc_timeout_in_ms = timeOutInMillis;
}
+ public static long getReadRpcTimeout()
+ {
+ return conf.read_rpc_timeout_in_ms;
+ }
+
+ public static void setReadRpcTimeout(Long timeOutInMillis)
+ {
+ conf.read_rpc_timeout_in_ms = timeOutInMillis;
+ }
+
+ public static long getRangeRpcTimeout()
+ {
+ return conf.range_rpc_timeout_in_ms;
+ }
+
+ public static void setRangeRpcTimeout(Long timeOutInMillis)
+ {
+ conf.range_rpc_timeout_in_ms = timeOutInMillis;
+ }
+
+ public static long getWriteRpcTimeout()
+ {
+ return conf.write_rpc_timeout_in_ms;
+ }
+
+ public static void setWriteRpcTimeout(Long timeOutInMillis)
+ {
+ conf.write_rpc_timeout_in_ms = timeOutInMillis;
+ }
+
+ public static long getTruncateRpcTimeout()
+ {
+ return conf.truncate_rpc_timeout_in_ms;
+ }
+
+ public static void setTruncateRpcTimeout(Long timeOutInMillis)
+ {
+ conf.truncate_rpc_timeout_in_ms = timeOutInMillis;
+ }
+
+ // not part of the Verb enum so we can change timeouts easily via JMX
+ public static long getTimeout(MessagingService.Verb verb)
+ {
+ switch (verb)
+ {
+ case READ:
+ return getReadRpcTimeout();
+ case RANGE_SLICE:
+ return getRangeRpcTimeout();
+ case TRUNCATE:
+ return getTruncateRpcTimeout();
+ case READ_REPAIR:
+ case MUTATION:
+ return getWriteRpcTimeout();
+ default:
+ return getRpcTimeout();
+ }
+ }
+
+ /**
+ * @return the minimum configured {read, write, range, truncate, misc} timeout
+ */
+ public static long getMinRpcTimeout()
+ {
+ return Longs.min(getRpcTimeout(), getReadRpcTimeout(), getRangeRpcTimeout(), getWriteRpcTimeout(), getTruncateRpcTimeout());
+ }
+
public static double getPhiConvictThreshold()
{
return conf.phi_convict_threshold;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/db/RangeSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
index c0054ea..5511127 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
@@ -43,8 +43,11 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.filter.IFilter;
+import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.io.IVersionedSerializer;
@@ -168,6 +171,11 @@ public class RangeSliceCommand implements IReadCommand
SlicePredicate pred = RangeSliceCommandSerializer.asSlicePredicate(predicate);
return new IndexScanCommand(keyspace, column_family, clause, pred, range);
}
+
+ public long getTimeout()
+ {
+ return DatabaseDescriptor.getRangeRpcTimeout();
+ }
}
class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceCommand>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 8749c46..74d8fba 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.IVersionedSerializer;
@@ -99,6 +100,11 @@ public abstract class ReadCommand implements IReadCommand
{
// noop
}
+
+ public long getTimeout()
+ {
+ return DatabaseDescriptor.getReadRpcTimeout();
+ }
}
class ReadCommandSerializer implements IVersionedSerializer<ReadCommand>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index 7b14586..8abc48d 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -155,7 +155,7 @@ public class BootStrapper
{
MessageOut message = new MessageOut(MessagingService.Verb.BOOTSTRAP_TOKEN);
int retries = 5;
- long timeout = Math.max(MessagingService.getDefaultCallbackTimeout(), BOOTSTRAP_TIMEOUT);
+ long timeout = Math.max(DatabaseDescriptor.getRpcTimeout(), BOOTSTRAP_TIMEOUT);
while (retries > 0)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index a5fd614..dc50ef8 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -41,7 +41,7 @@ public class MessageDeliveryTask implements Runnable
{
MessagingService.Verb verb = message.verb;
if (MessagingService.DROPPABLE_VERBS.contains(verb)
- && System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getRpcTimeout())
+ && System.currentTimeMillis() > constructionTime + message.getTimeout())
{
MessagingService.instance().incrementDroppedMessages(verb);
return;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/net/MessageIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java
index f3b712e..38e376f 100644
--- a/src/java/org/apache/cassandra/net/MessageIn.java
+++ b/src/java/org/apache/cassandra/net/MessageIn.java
@@ -26,6 +26,7 @@ import java.util.Map;
import com.google.common.collect.ImmutableMap;
import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.FileUtils;
@@ -99,6 +100,11 @@ public class MessageIn<T>
return MessagingService.verbStages.get(verb);
}
+ public long getTimeout()
+ {
+ return DatabaseDescriptor.getTimeout(verb);
+ }
+
public String toString()
{
StringBuilder sbuf = new StringBuilder("");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/net/MessageOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageOut.java b/src/java/org/apache/cassandra/net/MessageOut.java
index fe09c2e..78546c8 100644
--- a/src/java/org/apache/cassandra/net/MessageOut.java
+++ b/src/java/org/apache/cassandra/net/MessageOut.java
@@ -27,6 +27,7 @@ import java.util.Map;
import com.google.common.collect.ImmutableMap;
import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.utils.FBUtilities;
@@ -87,6 +88,11 @@ public class MessageOut<T>
return MessagingService.verbStages.get(verb);
}
+ public long getTimeout()
+ {
+ return DatabaseDescriptor.getTimeout(verb);
+ }
+
public String toString()
{
StringBuilder sbuf = new StringBuilder("");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index c0af377..18f3baa 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -285,7 +285,6 @@ public final class MessagingService implements MessagingServiceMBean
private final Map<String, AtomicLong> timeoutsPerHost = new HashMap<String, AtomicLong>();
private final Map<String, AtomicLong> recentTimeoutsPerHost = new HashMap<String, AtomicLong>();
private final List<ILatencySubscriber> subscribers = new ArrayList<ILatencySubscriber>();
- private static final long DEFAULT_CALLBACK_TIMEOUT = DatabaseDescriptor.getRpcTimeout();
// protocol versions of the other nodes in the cluster
private final ConcurrentMap<InetAddress, Integer> versions = new NonBlockingHashMap<InetAddress, Integer>();
@@ -324,7 +323,7 @@ public final class MessagingService implements MessagingServiceMBean
public Object apply(Pair<String, CallbackInfo> pair)
{
CallbackInfo expiredCallbackInfo = pair.right;
- maybeAddLatency(expiredCallbackInfo.callback, expiredCallbackInfo.target, (double) DatabaseDescriptor.getRpcTimeout());
+ maybeAddLatency(expiredCallbackInfo.callback, expiredCallbackInfo.target, (double) expiredCallbackInfo.sentMessage.getTimeout());
totalTimeouts++;
String ip = expiredCallbackInfo.target.getHostAddress();
AtomicLong c = timeoutsPerHost.get(ip);
@@ -350,7 +349,7 @@ public final class MessagingService implements MessagingServiceMBean
}
};
- callbacks = new ExpiringMap<String, CallbackInfo>(DEFAULT_CALLBACK_TIMEOUT, timeoutReporter);
+ callbacks = new ExpiringMap<String, CallbackInfo>(DatabaseDescriptor.getMinRpcTimeout(), timeoutReporter);
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
try
@@ -488,11 +487,6 @@ public final class MessagingService implements MessagingServiceMBean
return verbHandlers.get(type);
}
- public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to)
- {
- return addCallback(cb, message, to, DEFAULT_CALLBACK_TIMEOUT);
- }
-
public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to, long timeout)
{
String messageId = nextId();
@@ -520,7 +514,7 @@ public final class MessagingService implements MessagingServiceMBean
*/
public String sendRR(MessageOut message, InetAddress to, IMessageCallback cb)
{
- return sendRR(message, to, cb, DEFAULT_CALLBACK_TIMEOUT);
+ return sendRR(message, to, cb, message.getTimeout());
}
/**
@@ -914,11 +908,6 @@ public final class MessagingService implements MessagingServiceMBean
return completedTasks;
}
- public static long getDefaultCallbackTimeout()
- {
- return DEFAULT_CALLBACK_TIMEOUT;
- }
-
public Map<String, Integer> getDroppedMessages()
{
Map<String, Integer> map = new HashMap<String, Integer>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index a5d8181..bdf8223 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -131,7 +131,7 @@ public class OutboundTcpConnection extends Thread
disconnect();
continue;
}
- if (entry.timestamp < System.currentTimeMillis() - DatabaseDescriptor.getRpcTimeout())
+ if (entry.timestamp < System.currentTimeMillis() - m.getTimeout())
dropped.incrementAndGet();
else if (socket != null || connect())
writeConnected(m, id);
@@ -312,7 +312,7 @@ public class OutboundTcpConnection extends Thread
while (true)
{
Entry entry = backlog.peek();
- if (entry == null || entry.timestamp >= System.currentTimeMillis() - DatabaseDescriptor.getRpcTimeout())
+ if (entry == null || entry.timestamp >= System.currentTimeMillis() - entry.message.getTimeout())
break;
Entry entry2 = backlog.poll();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index a6c38ae..db85a47 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -44,7 +44,8 @@ public abstract class AbstractWriteResponseHandler implements IWriteResponseHand
public void get() throws TimeoutException
{
- long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
+ long timeout = DatabaseDescriptor.getWriteRpcTimeout() - (System.currentTimeMillis() - startTime);
+
boolean success;
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/service/IReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IReadCommand.java b/src/java/org/apache/cassandra/service/IReadCommand.java
index 743c80b..c6a129e 100644
--- a/src/java/org/apache/cassandra/service/IReadCommand.java
+++ b/src/java/org/apache/cassandra/service/IReadCommand.java
@@ -20,4 +20,5 @@ package org.apache.cassandra.service;
public interface IReadCommand
{
public String getKeyspace();
+ public long getTimeout();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 6608b3e..a3d273c 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -128,7 +128,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
public TResolved get() throws TimeoutException, DigestMismatchException, IOException
{
- long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
+ long timeout = command.getTimeout() - (System.currentTimeMillis() - startTime);
boolean success;
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/service/RepairCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RepairCallback.java b/src/java/org/apache/cassandra/service/RepairCallback.java
index 97f3079..8ded3e0 100644
--- a/src/java/org/apache/cassandra/service/RepairCallback.java
+++ b/src/java/org/apache/cassandra/service/RepairCallback.java
@@ -55,7 +55,7 @@ public class RepairCallback implements IAsyncCallback
public Row get() throws TimeoutException, DigestMismatchException, IOException
{
- long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
+ long timeout = DatabaseDescriptor.getWriteRpcTimeout() - (System.currentTimeMillis() - startTime);
try
{
condition.await(timeout, TimeUnit.MILLISECONDS);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 51f5fe2..92a3256 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -430,7 +430,7 @@ public class StorageProxy implements StorageProxyMBean
{
InetAddress destination = iter.next();
CompactEndpointSerializationHelper.serialize(destination, dos);
- String id = MessagingService.instance().addCallback(handler, message, destination);
+ String id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout());
dos.writeUTF(id);
if (logger.isDebugEnabled())
logger.debug("Adding FWD message to: " + destination + " with ID " + id);
@@ -763,7 +763,7 @@ public class StorageProxy implements StorageProxyMBean
RepairCallback handler = repairResponseHandlers.get(i);
// wait for the repair writes to be acknowledged, to minimize impact on any replica that's
// behind on writes in case the out-of-sync row is read multiple times in quick succession
- FBUtilities.waitOnFutures(handler.resolver.repairResults, DatabaseDescriptor.getRpcTimeout());
+ FBUtilities.waitOnFutures(handler.resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
Row row;
try
@@ -903,7 +903,7 @@ public class StorageProxy implements StorageProxyMBean
columnsCount += row.getLiveColumnCount();
logger.debug("range slices read {}", row.key);
}
- FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getRpcTimeout());
+ FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
}
catch (TimeoutException ex)
{
@@ -1237,7 +1237,7 @@ public class StorageProxy implements StorageProxyMBean
public final void run()
{
- if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getRpcTimeout())
+ if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getTimeout(verb))
{
MessagingService.instance().incrementDroppedMessages(verb);
return;
@@ -1282,13 +1282,18 @@ public class StorageProxy implements StorageProxyMBean
logger.warn("Some hints were not written before shutdown. This is not supposed to happen. You should (a) run repair, and (b) file a bug report");
}
- public Long getRpcTimeout()
- {
- return DatabaseDescriptor.getRpcTimeout();
- }
+ public Long getRpcTimeout() { return DatabaseDescriptor.getRpcTimeout(); }
+ public void setRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setRpcTimeout(timeoutInMillis); }
- public void setRpcTimeout(Long timeoutInMillis)
- {
- DatabaseDescriptor.setRpcTimeout(timeoutInMillis);
- }
+ public Long getReadRpcTimeout() { return DatabaseDescriptor.getReadRpcTimeout(); }
+ public void setReadRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setReadRpcTimeout(timeoutInMillis); }
+
+ public Long getWriteRpcTimeout() { return DatabaseDescriptor.getWriteRpcTimeout(); }
+ public void setWriteRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setWriteRpcTimeout(timeoutInMillis); }
+
+ public Long getRangeRpcTimeout() { return DatabaseDescriptor.getRangeRpcTimeout(); }
+ public void setRangeRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setRangeRpcTimeout(timeoutInMillis); }
+
+ public Long getTruncateRpcTimeout() { return DatabaseDescriptor.getTruncateRpcTimeout(); }
+ public void setTruncateRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setTruncateRpcTimeout(timeoutInMillis); }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/service/StorageProxyMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index f1fb25d..dd1541a 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -48,4 +48,12 @@ public interface StorageProxyMBean
public Long getRpcTimeout();
public void setRpcTimeout(Long timeoutInMillis);
+ public Long getReadRpcTimeout();
+ public void setReadRpcTimeout(Long timeoutInMillis);
+ public Long getWriteRpcTimeout();
+ public void setWriteRpcTimeout(Long timeoutInMillis);
+ public Long getRangeRpcTimeout();
+ public void setRangeRpcTimeout(Long timeoutInMillis);
+ public Long getTruncateRpcTimeout();
+ public void setTruncateRpcTimeout(Long timeoutInMillis);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
index e5dc93d..e17211c 100644
--- a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
@@ -49,7 +49,7 @@ public class TruncateResponseHandler implements IAsyncCallback
public void get() throws TimeoutException
{
- long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
+ long timeout = DatabaseDescriptor.getTruncateRpcTimeout() - (System.currentTimeMillis() - startTime);
boolean success;
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6610e46/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 54bf927..72a4677 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -104,7 +104,7 @@ public class CassandraServer implements Cassandra.Iface
List<Row> rows;
try
{
- schedule(DatabaseDescriptor.getRpcTimeout());
+ schedule(DatabaseDescriptor.getReadRpcTimeout());
try
{
rows = StorageProxy.read(commands, consistency_level);
@@ -626,7 +626,7 @@ public class CassandraServer implements Cassandra.Iface
return;
try
{
- schedule(DatabaseDescriptor.getRpcTimeout());
+ schedule(DatabaseDescriptor.getWriteRpcTimeout());
try
{
StorageProxy.mutate(mutations, consistency_level);
@@ -685,7 +685,7 @@ public class CassandraServer implements Cassandra.Iface
{
bounds = new Bounds<RowPosition>(RowPosition.forKey(range.start_key, p), RowPosition.forKey(range.end_key, p));
}
- schedule(DatabaseDescriptor.getRpcTimeout());
+ schedule(DatabaseDescriptor.getRangeRpcTimeout());
try
{
IFilter filter = ThriftValidation.asIFilter(predicate, metadata.getComparatorFor(column_parent.super_column));
@@ -745,7 +745,7 @@ public class CassandraServer implements Cassandra.Iface
List<Row> rows;
try
{
- schedule(DatabaseDescriptor.getRpcTimeout());
+ schedule(DatabaseDescriptor.getRangeRpcTimeout());
try
{
IFilter filter = ThriftValidation.asIFilter(predicate, metadata.comparator);
@@ -1091,7 +1091,7 @@ public class CassandraServer implements Cassandra.Iface
cState.hasColumnFamilyAccess(cfname, Permission.WRITE);
try
{
- schedule(DatabaseDescriptor.getRpcTimeout());
+ schedule(DatabaseDescriptor.getTruncateRpcTimeout());
try
{
StorageProxy.truncateBlocking(cState.getKeyspace(), cfname);