You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/10/03 02:09:50 UTC

[6/8] git commit: merge from 1.2

merge from 1.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e7c90e04
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e7c90e04
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e7c90e04

Branch: refs/heads/trunk
Commit: e7c90e04c1e85609ecc80099ad9b5d81b62828be
Parents: 76cb10c 6b58745
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Oct 2 19:09:24 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Oct 2 19:09:24 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/net/CallbackInfo.java  | 16 +-----------
 .../apache/cassandra/net/MessagingService.java  | 20 ++++++++-------
 .../apache/cassandra/net/WriteCallbackInfo.java | 26 +++++++++++++++++++
 .../apache/cassandra/service/StorageProxy.java  | 27 ++++++++++++++++----
 5 files changed, 61 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7c90e04/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index f012ed1,cc04eca..c1023f6
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,21 -1,5 +1,22 @@@
 -1.2.11
 +2.0.2
 + * Add configurable metrics reporting (CASSANDRA-4430)
 + * drop queries exceeding a configurable number of tombstones (CASSANDRA-6117)
 + * Track and persist sstable read activity (CASSANDRA-5515)
 + * Fixes for speculative retry (CASSANDRA-5932)
 + * Improve memory usage of metadata min/max column names (CASSANDRA-6077)
 + * Fix thrift validation refusing row markers on CQL3 tables (CASSANDRA-6081)
 + * Fix insertion of collections with CAS (CASSANDRA-6069)
 + * Correctly send metadata on SELECT COUNT (CASSANDRA-6080)
 + * Track clients' remote addresses in ClientState (CASSANDRA-6070)
 + * Create snapshot dir if it does not exist when migrating
 +   leveled manifest (CASSANDRA-6093)
 + * make sequential nodetool repair the default (CASSANDRA-5950)
 + * Add more hooks for compaction strategy implementations (CASSANDRA-6111)
 + * Fix potential NPE on composite 2ndary indexes (CASSANDRA-6098)
 + * Delete can potentially be skipped in batch (CASSANDRA-6115)
 + * Allow alter keyspace on system_traces (CASSANDRA-6016)
 +Merged from 1.2:
+  * Never return WriteTimeout for CL.ANY (CASSANDRA-6032)
   * Tracing should log write failure rather than raw exceptions (CASSANDRA-6133)
   * lock access to TM.endpointToHostIdMap (CASSANDRA-6103)
   * Allow estimated memtable size to exceed slab allocator size (CASSANDRA-6078)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7c90e04/src/java/org/apache/cassandra/net/CallbackInfo.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/CallbackInfo.java
index bb1a4e0,f90df8d..0edfee9
--- a/src/java/org/apache/cassandra/net/CallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/CallbackInfo.java
@@@ -20,7 -20,7 +20,6 @@@ package org.apache.cassandra.net
  import java.net.InetAddress;
  
  import org.apache.cassandra.io.IVersionedSerializer;
--import org.apache.cassandra.service.StorageProxy;
  
  /**
   * Encapsulates the callback information.
@@@ -30,8 -30,7 +29,7 @@@
  public class CallbackInfo
  {
      protected final InetAddress target;
 -    protected final IMessageCallback callback;
 +    protected final IAsyncCallback callback;
-     protected final MessageOut<?> sentMessage;
      protected final IVersionedSerializer<?> serializer;
  
      /**
@@@ -41,16 -40,10 +39,10 @@@
       * @param callback
       * @param serializer serializer to deserialize response message
       */
 -   public CallbackInfo(InetAddress target, IMessageCallback callback, IVersionedSerializer<?> serializer)
 +    public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?> serializer)
      {
-         this(target, callback, null, serializer);
-     }
- 
-     public CallbackInfo(InetAddress target, IAsyncCallback callback, MessageOut<?> sentMessage, IVersionedSerializer<?> serializer)
-     {
          this.target = target;
          this.callback = callback;
-         this.sentMessage = sentMessage;
          this.serializer = serializer;
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7c90e04/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/MessagingService.java
index 33e3bfb,dd02ca6..ca3845b
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@@ -538,17 -519,20 +537,20 @@@ public final class MessagingService imp
          return verbHandlers.get(type);
      }
  
 -    public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to, long timeout)
 +    public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress to, long timeout)
      {
+         assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel
 -        String messageId = nextId();
 +        int messageId = nextId();
-         CallbackInfo previous;
- 
-         // If HH is enabled and this is a mutation message => store the message to track for potential hints.
-         if (DatabaseDescriptor.hintedHandoffEnabled() && message.verb == Verb.MUTATION)
-             previous = callbacks.put(messageId, new CallbackInfo(to, cb, message, callbackDeserializers.get(message.verb)), timeout);
-         else
-             previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
+         CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
+         assert previous == null;
+         return messageId;
+     }
  
 -    public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to, long timeout, ConsistencyLevel consistencyLevel)
++    public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress to, long timeout, ConsistencyLevel consistencyLevel)
+     {
+         assert message.verb == Verb.MUTATION;
 -        String messageId = nextId();
++        int messageId = nextId();
+         CallbackInfo previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
          assert previous == null;
          return messageId;
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7c90e04/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/WriteCallbackInfo.java
index 0000000,8badbcf..abded75
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
@@@ -1,0 -1,26 +1,26 @@@
+ package org.apache.cassandra.net;
+ 
+ import java.net.InetAddress;
+ 
+ import org.apache.cassandra.db.ConsistencyLevel;
+ import org.apache.cassandra.io.IVersionedSerializer;
+ import org.apache.cassandra.service.StorageProxy;
+ 
+ public class WriteCallbackInfo extends CallbackInfo
+ {
+     public final MessageOut sentMessage;
+     private final ConsistencyLevel consistencyLevel;
+ 
 -    public WriteCallbackInfo(InetAddress target, IMessageCallback callback, MessageOut message, IVersionedSerializer<?> serializer, ConsistencyLevel consistencyLevel)
++    public WriteCallbackInfo(InetAddress target, IAsyncCallback callback, MessageOut message, IVersionedSerializer<?> serializer, ConsistencyLevel consistencyLevel)
+     {
+         super(target, callback, serializer);
+         assert message != null;
+         this.sentMessage = message;
+         this.consistencyLevel = consistencyLevel;
+     }
+ 
+     public boolean shouldHint()
+     {
+         return consistencyLevel != ConsistencyLevel.ANY && StorageProxy.shouldHint(target);
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7c90e04/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 51f171d,b23de1f..c430bc2
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -516,11 -199,28 +516,28 @@@ public class StorageProxy implements St
          }
          catch (WriteTimeoutException ex)
          {
-             writeMetrics.timeouts.mark();
-             ClientRequestMetrics.writeTimeouts.inc();
-             Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor);
-             throw ex;
+             if (consistency_level == ConsistencyLevel.ANY)
+             {
+                 for (IMutation mutation : mutations)
+                 {
+                     if (mutation instanceof CounterMutation)
+                         continue;
+ 
+                     Token tk = StorageService.getPartitioner().getToken(mutation.key());
 -                    List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(mutation.getTable(), tk);
 -                    Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, mutation.getTable());
++                    List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(mutation.getKeyspaceName(), tk);
++                    Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, mutation.getKeyspaceName());
+                     for (InetAddress target : Iterables.concat(naturalEndpoints, pendingEndpoints))
+                         submitHint((RowMutation) mutation, target, null, consistency_level);
+                 }
+                 Tracing.trace("Wrote hint to satisfy CL.ANY after no replicas acknowledged the write");
+             }
+             else
+             {
+                 writeMetrics.timeouts.mark();
+                 ClientRequestMetrics.writeTimeouts.inc();
+                 Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor);
+                 throw ex;
+             }
          }
          catch (UnavailableException e)
          {