You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/10/03 02:09:45 UTC
[1/8] git commit: 6132
Updated Branches:
refs/heads/cassandra-1.2 5440a0a67 -> 6b5874503
refs/heads/cassandra-2.0 76cb10ca9 -> e7c90e04c
refs/heads/trunk 87e19fc8f -> 70dc32023
6132
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5440a0a6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5440a0a6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5440a0a6
Branch: refs/heads/cassandra-2.0
Commit: 5440a0a6767544d6ea1ba34f5d2a3e223f260fb5
Parents: 64890d8
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Oct 2 14:09:29 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Oct 2 14:09:29 2013 -0500
----------------------------------------------------------------------
.../org/apache/cassandra/net/CallbackInfo.java | 17 ++----------
.../apache/cassandra/net/MessagingService.java | 20 ++++++++-------
.../apache/cassandra/net/WriteCallbackInfo.java | 26 +++++++++++++++++++
.../apache/cassandra/service/StorageProxy.java | 27 ++++++++++++++++----
4 files changed, 61 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5440a0a6/src/java/org/apache/cassandra/net/CallbackInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/CallbackInfo.java b/src/java/org/apache/cassandra/net/CallbackInfo.java
index f0e48e9..f90df8d 100644
--- a/src/java/org/apache/cassandra/net/CallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/CallbackInfo.java
@@ -31,7 +31,6 @@ public class CallbackInfo
{
protected final InetAddress target;
protected final IMessageCallback callback;
- protected final MessageOut<?> sentMessage;
protected final IVersionedSerializer<?> serializer;
/**
@@ -41,27 +40,15 @@ public class CallbackInfo
* @param callback
* @param serializer serializer to deserialize response message
*/
- public CallbackInfo(InetAddress target, IMessageCallback callback, IVersionedSerializer<?> serializer)
- {
- this(target, callback, null, serializer);
- }
-
- public CallbackInfo(InetAddress target, IMessageCallback callback, MessageOut<?> sentMessage, IVersionedSerializer<?> serializer)
+ public CallbackInfo(InetAddress target, IMessageCallback callback, IVersionedSerializer<?> serializer)
{
this.target = target;
this.callback = callback;
- this.sentMessage = sentMessage;
this.serializer = serializer;
}
- /**
- * @return TRUE iff a hint should be written for this target.
- *
- * NOTE:
- * Assumes it is only called after the write of "sentMessage" to "target" has timed out.
- */
public boolean shouldHint()
{
- return sentMessage != null && StorageProxy.shouldHint(target);
+ return false;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5440a0a6/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index a199e83..dd02ca6 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -329,8 +329,7 @@ public final class MessagingService implements MessagingServiceMBean
if (expiredCallbackInfo.shouldHint())
{
- assert expiredCallbackInfo.sentMessage != null;
- RowMutation rm = (RowMutation) expiredCallbackInfo.sentMessage.payload;
+ RowMutation rm = (RowMutation) ((WriteCallbackInfo) expiredCallbackInfo).sentMessage.payload;
return StorageProxy.submitHint(rm, expiredCallbackInfo.target, null, null);
}
@@ -522,15 +521,18 @@ public final class MessagingService implements MessagingServiceMBean
public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to, long timeout)
{
+ assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel
String messageId = nextId();
- CallbackInfo previous;
-
- // If HH is enabled and this is a mutation message => store the message to track for potential hints.
- if (DatabaseDescriptor.hintedHandoffEnabled() && message.verb == Verb.MUTATION)
- previous = callbacks.put(messageId, new CallbackInfo(to, cb, message, callbackDeserializers.get(message.verb)), timeout);
- else
- previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
+ CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
+ assert previous == null;
+ return messageId;
+ }
+ public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to, long timeout, ConsistencyLevel consistencyLevel)
+ {
+ assert message.verb == Verb.MUTATION;
+ String messageId = nextId();
+ CallbackInfo previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
assert previous == null;
return messageId;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5440a0a6/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
new file mode 100644
index 0000000..8badbcf
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
@@ -0,0 +1,26 @@
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.service.StorageProxy;
+
+public class WriteCallbackInfo extends CallbackInfo
+{
+ public final MessageOut sentMessage;
+ private final ConsistencyLevel consistencyLevel;
+
+ public WriteCallbackInfo(InetAddress target, IMessageCallback callback, MessageOut message, IVersionedSerializer<?> serializer, ConsistencyLevel consistencyLevel)
+ {
+ super(target, callback, serializer);
+ assert message != null;
+ this.sentMessage = message;
+ this.consistencyLevel = consistencyLevel;
+ }
+
+ public boolean shouldHint()
+ {
+ return consistencyLevel != ConsistencyLevel.ANY && StorageProxy.shouldHint(target);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5440a0a6/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 8a6e52e..b23de1f 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -196,14 +196,31 @@ public class StorageProxy implements StorageProxyMBean
{
responseHandler.get();
}
-
}
catch (WriteTimeoutException ex)
{
- writeMetrics.timeouts.mark();
- ClientRequestMetrics.writeTimeouts.inc();
- Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor);
- throw ex;
+ if (consistency_level == ConsistencyLevel.ANY)
+ {
+ for (IMutation mutation : mutations)
+ {
+ if (mutation instanceof CounterMutation)
+ continue;
+
+ Token tk = StorageService.getPartitioner().getToken(mutation.key());
+ List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(mutation.getTable(), tk);
+ Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, mutation.getTable());
+ for (InetAddress target : Iterables.concat(naturalEndpoints, pendingEndpoints))
+ submitHint((RowMutation) mutation, target, null, consistency_level);
+ }
+ Tracing.trace("Wrote hint to satisfy CL.ANY after no replicas acknowledged the write");
+ }
+ else
+ {
+ writeMetrics.timeouts.mark();
+ ClientRequestMetrics.writeTimeouts.inc();
+ Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor);
+ throw ex;
+ }
}
catch (UnavailableException e)
{
[3/8] git commit: CHANGES
Posted by jb...@apache.org.
CHANGES
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6b587450
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6b587450
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6b587450
Branch: refs/heads/cassandra-2.0
Commit: 6b5874503e0bce67460a960343c65f76066f4b69
Parents: 5440a0a
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Oct 2 19:01:59 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Oct 2 19:01:59 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b587450/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5267709..cc04eca 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.2.11
+ * Never return WriteTimeout for CL.ANY (CASSANDRA-6032)
* Tracing should log write failure rather than raw exceptions (CASSANDRA-6133)
* lock access to TM.endpointToHostIdMap (CASSANDRA-6103)
* Allow estimated memtable size to exceed slab allocator size (CASSANDRA-6078)
[7/8] git commit: merge from 1.2
Posted by jb...@apache.org.
merge from 1.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e7c90e04
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e7c90e04
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e7c90e04
Branch: refs/heads/cassandra-2.0
Commit: e7c90e04c1e85609ecc80099ad9b5d81b62828be
Parents: 76cb10c 6b58745
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Oct 2 19:09:24 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Oct 2 19:09:24 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/net/CallbackInfo.java | 16 +-----------
.../apache/cassandra/net/MessagingService.java | 20 ++++++++-------
.../apache/cassandra/net/WriteCallbackInfo.java | 26 +++++++++++++++++++
.../apache/cassandra/service/StorageProxy.java | 27 ++++++++++++++++----
5 files changed, 61 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7c90e04/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index f012ed1,cc04eca..c1023f6
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,21 -1,5 +1,22 @@@
-1.2.11
+2.0.2
+ * Add configurable metrics reporting (CASSANDRA-4430)
+ * drop queries exceeding a configurable number of tombstones (CASSANDRA-6117)
+ * Track and persist sstable read activity (CASSANDRA-5515)
+ * Fixes for speculative retry (CASSANDRA-5932)
+ * Improve memory usage of metadata min/max column names (CASSANDRA-6077)
+ * Fix thrift validation refusing row markers on CQL3 tables (CASSANDRA-6081)
+ * Fix insertion of collections with CAS (CASSANDRA-6069)
+ * Correctly send metadata on SELECT COUNT (CASSANDRA-6080)
+ * Track clients' remote addresses in ClientState (CASSANDRA-6070)
+ * Create snapshot dir if it does not exist when migrating
+ leveled manifest (CASSANDRA-6093)
+ * make sequential nodetool repair the default (CASSANDRA-5950)
+ * Add more hooks for compaction strategy implementations (CASSANDRA-6111)
+ * Fix potential NPE on composite 2ndary indexes (CASSANDRA-6098)
+ * Delete can potentially be skipped in batch (CASSANDRA-6115)
+ * Allow alter keyspace on system_traces (CASSANDRA-6016)
+Merged from 1.2:
+ * Never return WriteTimeout for CL.ANY (CASSANDRA-6032)
* Tracing should log write failure rather than raw exceptions (CASSANDRA-6133)
* lock access to TM.endpointToHostIdMap (CASSANDRA-6103)
* Allow estimated memtable size to exceed slab allocator size (CASSANDRA-6078)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7c90e04/src/java/org/apache/cassandra/net/CallbackInfo.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/CallbackInfo.java
index bb1a4e0,f90df8d..0edfee9
--- a/src/java/org/apache/cassandra/net/CallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/CallbackInfo.java
@@@ -20,7 -20,7 +20,6 @@@ package org.apache.cassandra.net
import java.net.InetAddress;
import org.apache.cassandra.io.IVersionedSerializer;
--import org.apache.cassandra.service.StorageProxy;
/**
* Encapsulates the callback information.
@@@ -30,8 -30,7 +29,7 @@@
public class CallbackInfo
{
protected final InetAddress target;
- protected final IMessageCallback callback;
+ protected final IAsyncCallback callback;
- protected final MessageOut<?> sentMessage;
protected final IVersionedSerializer<?> serializer;
/**
@@@ -41,16 -40,10 +39,10 @@@
* @param callback
* @param serializer serializer to deserialize response message
*/
- public CallbackInfo(InetAddress target, IMessageCallback callback, IVersionedSerializer<?> serializer)
+ public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?> serializer)
{
- this(target, callback, null, serializer);
- }
-
- public CallbackInfo(InetAddress target, IAsyncCallback callback, MessageOut<?> sentMessage, IVersionedSerializer<?> serializer)
- {
this.target = target;
this.callback = callback;
- this.sentMessage = sentMessage;
this.serializer = serializer;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7c90e04/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/MessagingService.java
index 33e3bfb,dd02ca6..ca3845b
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@@ -538,17 -519,20 +537,20 @@@ public final class MessagingService imp
return verbHandlers.get(type);
}
- public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to, long timeout)
+ public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress to, long timeout)
{
+ assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel
- String messageId = nextId();
+ int messageId = nextId();
- CallbackInfo previous;
-
- // If HH is enabled and this is a mutation message => store the message to track for potential hints.
- if (DatabaseDescriptor.hintedHandoffEnabled() && message.verb == Verb.MUTATION)
- previous = callbacks.put(messageId, new CallbackInfo(to, cb, message, callbackDeserializers.get(message.verb)), timeout);
- else
- previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
+ CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
+ assert previous == null;
+ return messageId;
+ }
- public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to, long timeout, ConsistencyLevel consistencyLevel)
++ public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress to, long timeout, ConsistencyLevel consistencyLevel)
+ {
+ assert message.verb == Verb.MUTATION;
- String messageId = nextId();
++ int messageId = nextId();
+ CallbackInfo previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
assert previous == null;
return messageId;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7c90e04/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/WriteCallbackInfo.java
index 0000000,8badbcf..abded75
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
@@@ -1,0 -1,26 +1,26 @@@
+ package org.apache.cassandra.net;
+
+ import java.net.InetAddress;
+
+ import org.apache.cassandra.db.ConsistencyLevel;
+ import org.apache.cassandra.io.IVersionedSerializer;
+ import org.apache.cassandra.service.StorageProxy;
+
+ public class WriteCallbackInfo extends CallbackInfo
+ {
+ public final MessageOut sentMessage;
+ private final ConsistencyLevel consistencyLevel;
+
- public WriteCallbackInfo(InetAddress target, IMessageCallback callback, MessageOut message, IVersionedSerializer<?> serializer, ConsistencyLevel consistencyLevel)
++ public WriteCallbackInfo(InetAddress target, IAsyncCallback callback, MessageOut message, IVersionedSerializer<?> serializer, ConsistencyLevel consistencyLevel)
+ {
+ super(target, callback, serializer);
+ assert message != null;
+ this.sentMessage = message;
+ this.consistencyLevel = consistencyLevel;
+ }
+
+ public boolean shouldHint()
+ {
+ return consistencyLevel != ConsistencyLevel.ANY && StorageProxy.shouldHint(target);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7c90e04/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 51f171d,b23de1f..c430bc2
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -516,11 -199,28 +516,28 @@@ public class StorageProxy implements St
}
catch (WriteTimeoutException ex)
{
- writeMetrics.timeouts.mark();
- ClientRequestMetrics.writeTimeouts.inc();
- Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor);
- throw ex;
+ if (consistency_level == ConsistencyLevel.ANY)
+ {
+ for (IMutation mutation : mutations)
+ {
+ if (mutation instanceof CounterMutation)
+ continue;
+
+ Token tk = StorageService.getPartitioner().getToken(mutation.key());
- List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(mutation.getTable(), tk);
- Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, mutation.getTable());
++ List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(mutation.getKeyspaceName(), tk);
++ Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, mutation.getKeyspaceName());
+ for (InetAddress target : Iterables.concat(naturalEndpoints, pendingEndpoints))
+ submitHint((RowMutation) mutation, target, null, consistency_level);
+ }
+ Tracing.trace("Wrote hint to satisfy CL.ANY after no replicas acknowledged the write");
+ }
+ else
+ {
+ writeMetrics.timeouts.mark();
+ ClientRequestMetrics.writeTimeouts.inc();
+ Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor);
+ throw ex;
+ }
}
catch (UnavailableException e)
{
[8/8] git commit: Merge branch 'cassandra-2.0' into trunk
Posted by jb...@apache.org.
Merge branch 'cassandra-2.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/70dc3202
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/70dc3202
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/70dc3202
Branch: refs/heads/trunk
Commit: 70dc32023d759716c4bdcf92060e3d977a7f588b
Parents: 87e19fc e7c90e0
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Oct 2 19:09:30 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Oct 2 19:09:30 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/net/CallbackInfo.java | 16 +-----------
.../apache/cassandra/net/MessagingService.java | 20 ++++++++-------
.../apache/cassandra/net/WriteCallbackInfo.java | 26 +++++++++++++++++++
.../apache/cassandra/service/StorageProxy.java | 27 ++++++++++++++++----
5 files changed, 61 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70dc3202/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70dc3202/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/70dc3202/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
[2/8] git commit: 6132
Posted by jb...@apache.org.
6132
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5440a0a6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5440a0a6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5440a0a6
Branch: refs/heads/trunk
Commit: 5440a0a6767544d6ea1ba34f5d2a3e223f260fb5
Parents: 64890d8
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Oct 2 14:09:29 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Oct 2 14:09:29 2013 -0500
----------------------------------------------------------------------
.../org/apache/cassandra/net/CallbackInfo.java | 17 ++----------
.../apache/cassandra/net/MessagingService.java | 20 ++++++++-------
.../apache/cassandra/net/WriteCallbackInfo.java | 26 +++++++++++++++++++
.../apache/cassandra/service/StorageProxy.java | 27 ++++++++++++++++----
4 files changed, 61 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5440a0a6/src/java/org/apache/cassandra/net/CallbackInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/CallbackInfo.java b/src/java/org/apache/cassandra/net/CallbackInfo.java
index f0e48e9..f90df8d 100644
--- a/src/java/org/apache/cassandra/net/CallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/CallbackInfo.java
@@ -31,7 +31,6 @@ public class CallbackInfo
{
protected final InetAddress target;
protected final IMessageCallback callback;
- protected final MessageOut<?> sentMessage;
protected final IVersionedSerializer<?> serializer;
/**
@@ -41,27 +40,15 @@ public class CallbackInfo
* @param callback
* @param serializer serializer to deserialize response message
*/
- public CallbackInfo(InetAddress target, IMessageCallback callback, IVersionedSerializer<?> serializer)
- {
- this(target, callback, null, serializer);
- }
-
- public CallbackInfo(InetAddress target, IMessageCallback callback, MessageOut<?> sentMessage, IVersionedSerializer<?> serializer)
+ public CallbackInfo(InetAddress target, IMessageCallback callback, IVersionedSerializer<?> serializer)
{
this.target = target;
this.callback = callback;
- this.sentMessage = sentMessage;
this.serializer = serializer;
}
- /**
- * @return TRUE iff a hint should be written for this target.
- *
- * NOTE:
- * Assumes it is only called after the write of "sentMessage" to "target" has timed out.
- */
public boolean shouldHint()
{
- return sentMessage != null && StorageProxy.shouldHint(target);
+ return false;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5440a0a6/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index a199e83..dd02ca6 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -329,8 +329,7 @@ public final class MessagingService implements MessagingServiceMBean
if (expiredCallbackInfo.shouldHint())
{
- assert expiredCallbackInfo.sentMessage != null;
- RowMutation rm = (RowMutation) expiredCallbackInfo.sentMessage.payload;
+ RowMutation rm = (RowMutation) ((WriteCallbackInfo) expiredCallbackInfo).sentMessage.payload;
return StorageProxy.submitHint(rm, expiredCallbackInfo.target, null, null);
}
@@ -522,15 +521,18 @@ public final class MessagingService implements MessagingServiceMBean
public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to, long timeout)
{
+ assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel
String messageId = nextId();
- CallbackInfo previous;
-
- // If HH is enabled and this is a mutation message => store the message to track for potential hints.
- if (DatabaseDescriptor.hintedHandoffEnabled() && message.verb == Verb.MUTATION)
- previous = callbacks.put(messageId, new CallbackInfo(to, cb, message, callbackDeserializers.get(message.verb)), timeout);
- else
- previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
+ CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
+ assert previous == null;
+ return messageId;
+ }
+ public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to, long timeout, ConsistencyLevel consistencyLevel)
+ {
+ assert message.verb == Verb.MUTATION;
+ String messageId = nextId();
+ CallbackInfo previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
assert previous == null;
return messageId;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5440a0a6/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
new file mode 100644
index 0000000..8badbcf
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
@@ -0,0 +1,26 @@
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.service.StorageProxy;
+
+public class WriteCallbackInfo extends CallbackInfo
+{
+ public final MessageOut sentMessage;
+ private final ConsistencyLevel consistencyLevel;
+
+ public WriteCallbackInfo(InetAddress target, IMessageCallback callback, MessageOut message, IVersionedSerializer<?> serializer, ConsistencyLevel consistencyLevel)
+ {
+ super(target, callback, serializer);
+ assert message != null;
+ this.sentMessage = message;
+ this.consistencyLevel = consistencyLevel;
+ }
+
+ public boolean shouldHint()
+ {
+ return consistencyLevel != ConsistencyLevel.ANY && StorageProxy.shouldHint(target);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5440a0a6/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 8a6e52e..b23de1f 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -196,14 +196,31 @@ public class StorageProxy implements StorageProxyMBean
{
responseHandler.get();
}
-
}
catch (WriteTimeoutException ex)
{
- writeMetrics.timeouts.mark();
- ClientRequestMetrics.writeTimeouts.inc();
- Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor);
- throw ex;
+ if (consistency_level == ConsistencyLevel.ANY)
+ {
+ for (IMutation mutation : mutations)
+ {
+ if (mutation instanceof CounterMutation)
+ continue;
+
+ Token tk = StorageService.getPartitioner().getToken(mutation.key());
+ List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(mutation.getTable(), tk);
+ Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, mutation.getTable());
+ for (InetAddress target : Iterables.concat(naturalEndpoints, pendingEndpoints))
+ submitHint((RowMutation) mutation, target, null, consistency_level);
+ }
+ Tracing.trace("Wrote hint to satisfy CL.ANY after no replicas acknowledged the write");
+ }
+ else
+ {
+ writeMetrics.timeouts.mark();
+ ClientRequestMetrics.writeTimeouts.inc();
+ Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor);
+ throw ex;
+ }
}
catch (UnavailableException e)
{
[5/8] git commit: CHANGES
Posted by jb...@apache.org.
CHANGES
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6b587450
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6b587450
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6b587450
Branch: refs/heads/cassandra-1.2
Commit: 6b5874503e0bce67460a960343c65f76066f4b69
Parents: 5440a0a
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Oct 2 19:01:59 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Oct 2 19:01:59 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b587450/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5267709..cc04eca 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.2.11
+ * Never return WriteTimeout for CL.ANY (CASSANDRA-6032)
* Tracing should log write failure rather than raw exceptions (CASSANDRA-6133)
* lock access to TM.endpointToHostIdMap (CASSANDRA-6103)
* Allow estimated memtable size to exceed slab allocator size (CASSANDRA-6078)
[6/8] git commit: merge from 1.2
Posted by jb...@apache.org.
merge from 1.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e7c90e04
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e7c90e04
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e7c90e04
Branch: refs/heads/trunk
Commit: e7c90e04c1e85609ecc80099ad9b5d81b62828be
Parents: 76cb10c 6b58745
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Oct 2 19:09:24 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Oct 2 19:09:24 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/net/CallbackInfo.java | 16 +-----------
.../apache/cassandra/net/MessagingService.java | 20 ++++++++-------
.../apache/cassandra/net/WriteCallbackInfo.java | 26 +++++++++++++++++++
.../apache/cassandra/service/StorageProxy.java | 27 ++++++++++++++++----
5 files changed, 61 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7c90e04/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index f012ed1,cc04eca..c1023f6
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,21 -1,5 +1,22 @@@
-1.2.11
+2.0.2
+ * Add configurable metrics reporting (CASSANDRA-4430)
+ * drop queries exceeding a configurable number of tombstones (CASSANDRA-6117)
+ * Track and persist sstable read activity (CASSANDRA-5515)
+ * Fixes for speculative retry (CASSANDRA-5932)
+ * Improve memory usage of metadata min/max column names (CASSANDRA-6077)
+ * Fix thrift validation refusing row markers on CQL3 tables (CASSANDRA-6081)
+ * Fix insertion of collections with CAS (CASSANDRA-6069)
+ * Correctly send metadata on SELECT COUNT (CASSANDRA-6080)
+ * Track clients' remote addresses in ClientState (CASSANDRA-6070)
+ * Create snapshot dir if it does not exist when migrating
+ leveled manifest (CASSANDRA-6093)
+ * make sequential nodetool repair the default (CASSANDRA-5950)
+ * Add more hooks for compaction strategy implementations (CASSANDRA-6111)
+ * Fix potential NPE on composite 2ndary indexes (CASSANDRA-6098)
+ * Delete can potentially be skipped in batch (CASSANDRA-6115)
+ * Allow alter keyspace on system_traces (CASSANDRA-6016)
+Merged from 1.2:
+ * Never return WriteTimeout for CL.ANY (CASSANDRA-6032)
* Tracing should log write failure rather than raw exceptions (CASSANDRA-6133)
* lock access to TM.endpointToHostIdMap (CASSANDRA-6103)
* Allow estimated memtable size to exceed slab allocator size (CASSANDRA-6078)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7c90e04/src/java/org/apache/cassandra/net/CallbackInfo.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/CallbackInfo.java
index bb1a4e0,f90df8d..0edfee9
--- a/src/java/org/apache/cassandra/net/CallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/CallbackInfo.java
@@@ -20,7 -20,7 +20,6 @@@ package org.apache.cassandra.net
import java.net.InetAddress;
import org.apache.cassandra.io.IVersionedSerializer;
--import org.apache.cassandra.service.StorageProxy;
/**
* Encapsulates the callback information.
@@@ -30,8 -30,7 +29,7 @@@
public class CallbackInfo
{
protected final InetAddress target;
- protected final IMessageCallback callback;
+ protected final IAsyncCallback callback;
- protected final MessageOut<?> sentMessage;
protected final IVersionedSerializer<?> serializer;
/**
@@@ -41,16 -40,10 +39,10 @@@
* @param callback
* @param serializer serializer to deserialize response message
*/
- public CallbackInfo(InetAddress target, IMessageCallback callback, IVersionedSerializer<?> serializer)
+ public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?> serializer)
{
- this(target, callback, null, serializer);
- }
-
- public CallbackInfo(InetAddress target, IAsyncCallback callback, MessageOut<?> sentMessage, IVersionedSerializer<?> serializer)
- {
this.target = target;
this.callback = callback;
- this.sentMessage = sentMessage;
this.serializer = serializer;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7c90e04/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/MessagingService.java
index 33e3bfb,dd02ca6..ca3845b
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@@ -538,17 -519,20 +537,20 @@@ public final class MessagingService imp
return verbHandlers.get(type);
}
- public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to, long timeout)
+ public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress to, long timeout)
{
+ assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel
- String messageId = nextId();
+ int messageId = nextId();
- CallbackInfo previous;
-
- // If HH is enabled and this is a mutation message => store the message to track for potential hints.
- if (DatabaseDescriptor.hintedHandoffEnabled() && message.verb == Verb.MUTATION)
- previous = callbacks.put(messageId, new CallbackInfo(to, cb, message, callbackDeserializers.get(message.verb)), timeout);
- else
- previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
+ CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
+ assert previous == null;
+ return messageId;
+ }
- public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to, long timeout, ConsistencyLevel consistencyLevel)
++ public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress to, long timeout, ConsistencyLevel consistencyLevel)
+ {
+ assert message.verb == Verb.MUTATION;
- String messageId = nextId();
++ int messageId = nextId();
+ CallbackInfo previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
assert previous == null;
return messageId;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7c90e04/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/WriteCallbackInfo.java
index 0000000,8badbcf..abded75
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
@@@ -1,0 -1,26 +1,26 @@@
+ package org.apache.cassandra.net;
+
+ import java.net.InetAddress;
+
+ import org.apache.cassandra.db.ConsistencyLevel;
+ import org.apache.cassandra.io.IVersionedSerializer;
+ import org.apache.cassandra.service.StorageProxy;
+
+ public class WriteCallbackInfo extends CallbackInfo
+ {
+ public final MessageOut sentMessage;
+ private final ConsistencyLevel consistencyLevel;
+
- public WriteCallbackInfo(InetAddress target, IMessageCallback callback, MessageOut message, IVersionedSerializer<?> serializer, ConsistencyLevel consistencyLevel)
++ public WriteCallbackInfo(InetAddress target, IAsyncCallback callback, MessageOut message, IVersionedSerializer<?> serializer, ConsistencyLevel consistencyLevel)
+ {
+ super(target, callback, serializer);
+ assert message != null;
+ this.sentMessage = message;
+ this.consistencyLevel = consistencyLevel;
+ }
+
+ public boolean shouldHint()
+ {
+ return consistencyLevel != ConsistencyLevel.ANY && StorageProxy.shouldHint(target);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7c90e04/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 51f171d,b23de1f..c430bc2
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -516,11 -199,28 +516,28 @@@ public class StorageProxy implements St
}
catch (WriteTimeoutException ex)
{
- writeMetrics.timeouts.mark();
- ClientRequestMetrics.writeTimeouts.inc();
- Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor);
- throw ex;
+ if (consistency_level == ConsistencyLevel.ANY)
+ {
+ for (IMutation mutation : mutations)
+ {
+ if (mutation instanceof CounterMutation)
+ continue;
+
+ Token tk = StorageService.getPartitioner().getToken(mutation.key());
- List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(mutation.getTable(), tk);
- Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, mutation.getTable());
++ List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(mutation.getKeyspaceName(), tk);
++ Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, mutation.getKeyspaceName());
+ for (InetAddress target : Iterables.concat(naturalEndpoints, pendingEndpoints))
+ submitHint((RowMutation) mutation, target, null, consistency_level);
+ }
+ Tracing.trace("Wrote hint to satisfy CL.ANY after no replicas acknowledged the write");
+ }
+ else
+ {
+ writeMetrics.timeouts.mark();
+ ClientRequestMetrics.writeTimeouts.inc();
+ Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor);
+ throw ex;
+ }
}
catch (UnavailableException e)
{
[4/8] git commit: CHANGES
Posted by jb...@apache.org.
CHANGES
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6b587450
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6b587450
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6b587450
Branch: refs/heads/trunk
Commit: 6b5874503e0bce67460a960343c65f76066f4b69
Parents: 5440a0a
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Oct 2 19:01:59 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Oct 2 19:01:59 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b587450/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5267709..cc04eca 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.2.11
+ * Never return WriteTimeout for CL.ANY (CASSANDRA-6032)
* Tracing should log write failure rather than raw exceptions (CASSANDRA-6133)
* lock access to TM.endpointToHostIdMap (CASSANDRA-6103)
* Allow estimated memtable size to exceed slab allocator size (CASSANDRA-6078)