You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/10/04 23:32:33 UTC
[2/3] git commit: Revert CASSANDRA-6132 (pushing to 2.0 only)
Revert CASSANDRA-6132 (pushing to 2.0 only)
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cf38e9e6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cf38e9e6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cf38e9e6
Branch: refs/heads/cassandra-2.0
Commit: cf38e9e6f96e692909e7669c053e372a638605e7
Parents: 731e83b
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Oct 4 16:31:00 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Oct 4 16:31:29 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 -
.../cassandra/config/DatabaseDescriptor.java | 3 --
.../org/apache/cassandra/dht/BootStrapper.java | 8 ++---
.../org/apache/cassandra/net/CallbackInfo.java | 17 +++++++--
.../apache/cassandra/net/MessagingService.java | 28 +++++++--------
.../apache/cassandra/net/WriteCallbackInfo.java | 26 --------------
.../apache/cassandra/service/StorageProxy.java | 38 +++++---------------
7 files changed, 40 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cf38e9e6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c1d1991..3dc5e77 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,4 @@
1.2.11
- * Never return WriteTimeout for CL.ANY (CASSANDRA-6032)
* Tracing should log write failure rather than raw exceptions (CASSANDRA-6133)
* lock access to TM.endpointToHostIdMap (CASSANDRA-6103)
* Allow estimated memtable size to exceed slab allocator size (CASSANDRA-6078)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cf38e9e6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 218f719..633ea9a 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -38,7 +38,6 @@ import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DefsTable;
import org.apache.cassandra.db.SystemTable;
-import org.apache.cassandra.dht.BootStrapper;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.FSWriteError;
@@ -840,8 +839,6 @@ public class DatabaseDescriptor
case READ_REPAIR:
case MUTATION:
return getWriteRpcTimeout();
- case BOOTSTRAP_TOKEN:
- return BootStrapper.BOOTSTRAP_TIMEOUT;
default:
return getRpcTimeout();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cf38e9e6/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index 2e79562..ff76534 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -48,8 +48,6 @@ import org.apache.cassandra.net.*;
public class BootStrapper
{
- public static final long BOOTSTRAP_TIMEOUT = 30000; // default bootstrap timeout of 30s
-
private static final Logger logger = LoggerFactory.getLogger(BootStrapper.class);
/* endpoint that needs to be bootstrapped */
@@ -57,6 +55,7 @@ public class BootStrapper
/* token of the node being bootstrapped. */
protected final Collection<Token> tokens;
protected final TokenMetadata tokenMetadata;
+ private static final long BOOTSTRAP_TIMEOUT = 30000; // default bootstrap timeout of 30s
public BootStrapper(InetAddress address, Collection<Token> tokens, TokenMetadata tmd)
{
@@ -188,12 +187,13 @@ public class BootStrapper
{
MessageOut message = new MessageOut(MessagingService.Verb.BOOTSTRAP_TOKEN);
int retries = 5;
+ long timeout = Math.max(DatabaseDescriptor.getRpcTimeout(), BOOTSTRAP_TIMEOUT);
while (retries > 0)
{
BootstrapTokenCallback btc = new BootstrapTokenCallback();
- MessagingService.instance().sendRR(message, maxEndpoint, btc);
- Token token = btc.getToken(BOOTSTRAP_TIMEOUT);
+ MessagingService.instance().sendRR(message, maxEndpoint, btc, timeout);
+ Token token = btc.getToken(timeout);
if (token != null)
return token;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cf38e9e6/src/java/org/apache/cassandra/net/CallbackInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/CallbackInfo.java b/src/java/org/apache/cassandra/net/CallbackInfo.java
index f90df8d..f0e48e9 100644
--- a/src/java/org/apache/cassandra/net/CallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/CallbackInfo.java
@@ -31,6 +31,7 @@ public class CallbackInfo
{
protected final InetAddress target;
protected final IMessageCallback callback;
+ protected final MessageOut<?> sentMessage;
protected final IVersionedSerializer<?> serializer;
/**
@@ -40,15 +41,27 @@ public class CallbackInfo
* @param callback
* @param serializer serializer to deserialize response message
*/
- public CallbackInfo(InetAddress target, IMessageCallback callback, IVersionedSerializer<?> serializer)
+ public CallbackInfo(InetAddress target, IMessageCallback callback, IVersionedSerializer<?> serializer)
+ {
+ this(target, callback, null, serializer);
+ }
+
+ public CallbackInfo(InetAddress target, IMessageCallback callback, MessageOut<?> sentMessage, IVersionedSerializer<?> serializer)
{
this.target = target;
this.callback = callback;
+ this.sentMessage = sentMessage;
this.serializer = serializer;
}
+ /**
+ * @return TRUE iff a hint should be written for this target.
+ *
+ * NOTE:
+ * Assumes it is only called after the write of "sentMessage" to "target" has timed out.
+ */
public boolean shouldHint()
{
- return false;
+ return sentMessage != null && StorageProxy.shouldHint(target);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cf38e9e6/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index c9b0047..a199e83 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -329,7 +329,8 @@ public final class MessagingService implements MessagingServiceMBean
if (expiredCallbackInfo.shouldHint())
{
- RowMutation rm = (RowMutation) ((WriteCallbackInfo) expiredCallbackInfo).sentMessage.payload;
+ assert expiredCallbackInfo.sentMessage != null;
+ RowMutation rm = (RowMutation) expiredCallbackInfo.sentMessage.payload;
return StorageProxy.submitHint(rm, expiredCallbackInfo.target, null, null);
}
@@ -521,18 +522,15 @@ public final class MessagingService implements MessagingServiceMBean
public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to, long timeout)
{
- assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel
String messageId = nextId();
- CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
- assert previous == null;
- return messageId;
- }
+ CallbackInfo previous;
+
+ // If HH is enabled and this is a mutation message => store the message to track for potential hints.
+ if (DatabaseDescriptor.hintedHandoffEnabled() && message.verb == Verb.MUTATION)
+ previous = callbacks.put(messageId, new CallbackInfo(to, cb, message, callbackDeserializers.get(message.verb)), timeout);
+ else
+ previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
- public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to, long timeout, ConsistencyLevel consistencyLevel)
- {
- assert message.verb == Verb.MUTATION;
- String messageId = nextId();
- CallbackInfo previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
assert previous == null;
return messageId;
}
@@ -550,7 +548,7 @@ public final class MessagingService implements MessagingServiceMBean
*/
public String sendRR(MessageOut message, InetAddress to, IMessageCallback cb)
{
- return sendRR(message, to, cb, message.getTimeout(), null);
+ return sendRR(message, to, cb, message.getTimeout());
}
/**
@@ -567,11 +565,9 @@ public final class MessagingService implements MessagingServiceMBean
* @param timeout the timeout used for expiration
* @return an reference to message id used to match with the result
*/
- public String sendRR(MessageOut message, InetAddress to, IMessageCallback cb, long timeout, ConsistencyLevel consistencyLevel)
+ public String sendRR(MessageOut message, InetAddress to, IMessageCallback cb, long timeout)
{
- String id = consistencyLevel == null
- ? addCallback(cb, message, to, timeout)
- : addCallback(cb, message, to, timeout, consistencyLevel);
+ String id = addCallback(cb, message, to, timeout);
if (cb instanceof AbstractWriteResponseHandler)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cf38e9e6/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
deleted file mode 100644
index 8badbcf..0000000
--- a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.cassandra.net;
-
-import java.net.InetAddress;
-
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.service.StorageProxy;
-
-public class WriteCallbackInfo extends CallbackInfo
-{
- public final MessageOut sentMessage;
- private final ConsistencyLevel consistencyLevel;
-
- public WriteCallbackInfo(InetAddress target, IMessageCallback callback, MessageOut message, IVersionedSerializer<?> serializer, ConsistencyLevel consistencyLevel)
- {
- super(target, callback, serializer);
- assert message != null;
- this.sentMessage = message;
- this.consistencyLevel = consistencyLevel;
- }
-
- public boolean shouldHint()
- {
- return consistencyLevel != ConsistencyLevel.ANY && StorageProxy.shouldHint(target);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cf38e9e6/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index c0b7271..8a6e52e 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -196,34 +196,14 @@ public class StorageProxy implements StorageProxyMBean
{
responseHandler.get();
}
+
}
catch (WriteTimeoutException ex)
{
- if (consistency_level == ConsistencyLevel.ANY)
- {
- // hint all the mutations (except counters, which can't be safely retried). This means
- // we'll re-hint any successful ones; doesn't seem worth it to track individual success
- // just for this unusual case.
- for (IMutation mutation : mutations)
- {
- if (mutation instanceof CounterMutation)
- continue;
-
- Token tk = StorageService.getPartitioner().getToken(mutation.key());
- List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(mutation.getTable(), tk);
- Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, mutation.getTable());
- for (InetAddress target : Iterables.concat(naturalEndpoints, pendingEndpoints))
- submitHint((RowMutation) mutation, target, null, consistency_level);
- }
- Tracing.trace("Wrote hint to satisfy CL.ANY after no replicas acknowledged the write");
- }
- else
- {
- writeMetrics.timeouts.mark();
- ClientRequestMetrics.writeTimeouts.inc();
- Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor);
- throw ex;
- }
+ writeMetrics.timeouts.mark();
+ ClientRequestMetrics.writeTimeouts.inc();
+ Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor);
+ throw ex;
}
catch (UnavailableException e)
{
@@ -642,11 +622,11 @@ public class StorageProxy implements StorageProxyMBean
{
// yes, the loop and non-loop code here are the same; this is clunky but we want to avoid
// creating a second iterator since we already have a perfectly good one
- MessagingService.instance().sendRR(message, target, handler, message.getTimeout(), handler.consistencyLevel);
+ MessagingService.instance().sendRR(message, target, handler);
while (iter.hasNext())
{
target = iter.next();
- MessagingService.instance().sendRR(message, target, handler, message.getTimeout(), handler.consistencyLevel);
+ MessagingService.instance().sendRR(message, target, handler);
}
return;
}
@@ -659,13 +639,13 @@ public class StorageProxy implements StorageProxyMBean
{
InetAddress destination = iter.next();
CompactEndpointSerializationHelper.serialize(destination, dos);
- String id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout(), handler.consistencyLevel);
+ String id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout());
dos.writeUTF(id);
}
message = message.withParameter(RowMutation.FORWARD_TO, bos.toByteArray());
// send the combined message + forward headers
Tracing.trace("Enqueuing message to {}", target);
- MessagingService.instance().sendRR(message, target, handler, message.getTimeout(), handler.consistencyLevel);
+ MessagingService.instance().sendRR(message, target, handler);
}
private static void insertLocal(final RowMutation rm, final AbstractWriteResponseHandler responseHandler)