You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/10/04 23:32:33 UTC

[2/3] git commit: Revert CASSANDRA-6132 (pushing to 2.0 only)

Revert CASSANDRA-6132 (pushing to 2.0 only)


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cf38e9e6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cf38e9e6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cf38e9e6

Branch: refs/heads/cassandra-2.0
Commit: cf38e9e6f96e692909e7669c053e372a638605e7
Parents: 731e83b
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Oct 4 16:31:00 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Oct 4 16:31:29 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 -
 .../cassandra/config/DatabaseDescriptor.java    |  3 --
 .../org/apache/cassandra/dht/BootStrapper.java  |  8 ++---
 .../org/apache/cassandra/net/CallbackInfo.java  | 17 +++++++--
 .../apache/cassandra/net/MessagingService.java  | 28 +++++++--------
 .../apache/cassandra/net/WriteCallbackInfo.java | 26 --------------
 .../apache/cassandra/service/StorageProxy.java  | 38 +++++---------------
 7 files changed, 40 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/cf38e9e6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c1d1991..3dc5e77 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,4 @@
 1.2.11
- * Never return WriteTimeout for CL.ANY (CASSANDRA-6032)
  * Tracing should log write failure rather than raw exceptions (CASSANDRA-6133)
  * lock access to TM.endpointToHostIdMap (CASSANDRA-6103)
  * Allow estimated memtable size to exceed slab allocator size (CASSANDRA-6078)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cf38e9e6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 218f719..633ea9a 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -38,7 +38,6 @@ import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DefsTable;
 import org.apache.cassandra.db.SystemTable;
-import org.apache.cassandra.dht.BootStrapper;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.FSWriteError;
@@ -840,8 +839,6 @@ public class DatabaseDescriptor
             case READ_REPAIR:
             case MUTATION:
                 return getWriteRpcTimeout();
-            case BOOTSTRAP_TOKEN:
-                return BootStrapper.BOOTSTRAP_TIMEOUT;
             default:
                 return getRpcTimeout();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cf38e9e6/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index 2e79562..ff76534 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -48,8 +48,6 @@ import org.apache.cassandra.net.*;
 
 public class BootStrapper
 {
-    public static final long BOOTSTRAP_TIMEOUT = 30000; // default bootstrap timeout of 30s
-
     private static final Logger logger = LoggerFactory.getLogger(BootStrapper.class);
 
     /* endpoint that needs to be bootstrapped */
@@ -57,6 +55,7 @@ public class BootStrapper
     /* token of the node being bootstrapped. */
     protected final Collection<Token> tokens;
     protected final TokenMetadata tokenMetadata;
+    private static final long BOOTSTRAP_TIMEOUT = 30000; // default bootstrap timeout of 30s
 
     public BootStrapper(InetAddress address, Collection<Token> tokens, TokenMetadata tmd)
     {
@@ -188,12 +187,13 @@ public class BootStrapper
     {
         MessageOut message = new MessageOut(MessagingService.Verb.BOOTSTRAP_TOKEN);
         int retries = 5;
+        long timeout = Math.max(DatabaseDescriptor.getRpcTimeout(), BOOTSTRAP_TIMEOUT);
 
         while (retries > 0)
         {
             BootstrapTokenCallback btc = new BootstrapTokenCallback();
-            MessagingService.instance().sendRR(message, maxEndpoint, btc);
-            Token token = btc.getToken(BOOTSTRAP_TIMEOUT);
+            MessagingService.instance().sendRR(message, maxEndpoint, btc, timeout);
+            Token token = btc.getToken(timeout);
             if (token != null)
                 return token;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cf38e9e6/src/java/org/apache/cassandra/net/CallbackInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/CallbackInfo.java b/src/java/org/apache/cassandra/net/CallbackInfo.java
index f90df8d..f0e48e9 100644
--- a/src/java/org/apache/cassandra/net/CallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/CallbackInfo.java
@@ -31,6 +31,7 @@ public class CallbackInfo
 {
     protected final InetAddress target;
     protected final IMessageCallback callback;
+    protected final MessageOut<?> sentMessage;
     protected final IVersionedSerializer<?> serializer;
 
     /**
@@ -40,15 +41,27 @@ public class CallbackInfo
      * @param callback
      * @param serializer serializer to deserialize response message
      */
-   public CallbackInfo(InetAddress target, IMessageCallback callback, IVersionedSerializer<?> serializer)
+    public CallbackInfo(InetAddress target, IMessageCallback callback, IVersionedSerializer<?> serializer)
+    {
+        this(target, callback, null, serializer);
+    }
+
+    public CallbackInfo(InetAddress target, IMessageCallback callback, MessageOut<?> sentMessage, IVersionedSerializer<?> serializer)
     {
         this.target = target;
         this.callback = callback;
+        this.sentMessage = sentMessage;
         this.serializer = serializer;
     }
 
+    /**
+     * @return TRUE iff a hint should be written for this target.
+     *
+     * NOTE:
+     * Assumes it is only called after the write of "sentMessage" to "target" has timed out.
+     */
     public boolean shouldHint()
     {
-        return false;
+        return sentMessage != null && StorageProxy.shouldHint(target);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cf38e9e6/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index c9b0047..a199e83 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -329,7 +329,8 @@ public final class MessagingService implements MessagingServiceMBean
 
                 if (expiredCallbackInfo.shouldHint())
                 {
-                    RowMutation rm = (RowMutation) ((WriteCallbackInfo) expiredCallbackInfo).sentMessage.payload;
+                    assert expiredCallbackInfo.sentMessage != null;
+                    RowMutation rm = (RowMutation) expiredCallbackInfo.sentMessage.payload;
                     return StorageProxy.submitHint(rm, expiredCallbackInfo.target, null, null);
                 }
 
@@ -521,18 +522,15 @@ public final class MessagingService implements MessagingServiceMBean
 
     public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to, long timeout)
     {
-        assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel
         String messageId = nextId();
-        CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
-        assert previous == null;
-        return messageId;
-    }
+        CallbackInfo previous;
+
+        // If HH is enabled and this is a mutation message => store the message to track for potential hints.
+        if (DatabaseDescriptor.hintedHandoffEnabled() && message.verb == Verb.MUTATION)
+            previous = callbacks.put(messageId, new CallbackInfo(to, cb, message, callbackDeserializers.get(message.verb)), timeout);
+        else
+            previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
 
-    public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to, long timeout, ConsistencyLevel consistencyLevel)
-    {
-        assert message.verb == Verb.MUTATION;
-        String messageId = nextId();
-        CallbackInfo previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
         assert previous == null;
         return messageId;
     }
@@ -550,7 +548,7 @@ public final class MessagingService implements MessagingServiceMBean
      */
     public String sendRR(MessageOut message, InetAddress to, IMessageCallback cb)
     {
-        return sendRR(message, to, cb, message.getTimeout(), null);
+        return sendRR(message, to, cb, message.getTimeout());
     }
 
     /**
@@ -567,11 +565,9 @@ public final class MessagingService implements MessagingServiceMBean
      * @param timeout the timeout used for expiration
      * @return an reference to message id used to match with the result
      */
-    public String sendRR(MessageOut message, InetAddress to, IMessageCallback cb, long timeout, ConsistencyLevel consistencyLevel)
+    public String sendRR(MessageOut message, InetAddress to, IMessageCallback cb, long timeout)
     {
-        String id = consistencyLevel == null
-                  ? addCallback(cb, message, to, timeout)
-                  : addCallback(cb, message, to, timeout, consistencyLevel);
+        String id = addCallback(cb, message, to, timeout);
 
         if (cb instanceof AbstractWriteResponseHandler)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cf38e9e6/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
deleted file mode 100644
index 8badbcf..0000000
--- a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.cassandra.net;
-
-import java.net.InetAddress;
-
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.service.StorageProxy;
-
-public class WriteCallbackInfo extends CallbackInfo
-{
-    public final MessageOut sentMessage;
-    private final ConsistencyLevel consistencyLevel;
-
-    public WriteCallbackInfo(InetAddress target, IMessageCallback callback, MessageOut message, IVersionedSerializer<?> serializer, ConsistencyLevel consistencyLevel)
-    {
-        super(target, callback, serializer);
-        assert message != null;
-        this.sentMessage = message;
-        this.consistencyLevel = consistencyLevel;
-    }
-
-    public boolean shouldHint()
-    {
-        return consistencyLevel != ConsistencyLevel.ANY && StorageProxy.shouldHint(target);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cf38e9e6/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index c0b7271..8a6e52e 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -196,34 +196,14 @@ public class StorageProxy implements StorageProxyMBean
             {
                 responseHandler.get();
             }
+
         }
         catch (WriteTimeoutException ex)
         {
-            if (consistency_level == ConsistencyLevel.ANY)
-            {
-                // hint all the mutations (except counters, which can't be safely retried).  This means
-                // we'll re-hint any successful ones; doesn't seem worth it to track individual success
-                // just for this unusual case.
-                for (IMutation mutation : mutations)
-                {
-                    if (mutation instanceof CounterMutation)
-                        continue;
-
-                    Token tk = StorageService.getPartitioner().getToken(mutation.key());
-                    List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(mutation.getTable(), tk);
-                    Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, mutation.getTable());
-                    for (InetAddress target : Iterables.concat(naturalEndpoints, pendingEndpoints))
-                        submitHint((RowMutation) mutation, target, null, consistency_level);
-                }
-                Tracing.trace("Wrote hint to satisfy CL.ANY after no replicas acknowledged the write");
-            }
-            else
-            {
-                writeMetrics.timeouts.mark();
-                ClientRequestMetrics.writeTimeouts.inc();
-                Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor);
-                throw ex;
-            }
+            writeMetrics.timeouts.mark();
+            ClientRequestMetrics.writeTimeouts.inc();
+            Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor);
+            throw ex;
         }
         catch (UnavailableException e)
         {
@@ -642,11 +622,11 @@ public class StorageProxy implements StorageProxyMBean
         {
             // yes, the loop and non-loop code here are the same; this is clunky but we want to avoid
             // creating a second iterator since we already have a perfectly good one
-            MessagingService.instance().sendRR(message, target, handler, message.getTimeout(), handler.consistencyLevel);
+            MessagingService.instance().sendRR(message, target, handler);
             while (iter.hasNext())
             {
                 target = iter.next();
-                MessagingService.instance().sendRR(message, target, handler, message.getTimeout(), handler.consistencyLevel);
+                MessagingService.instance().sendRR(message, target, handler);
             }
             return;
         }
@@ -659,13 +639,13 @@ public class StorageProxy implements StorageProxyMBean
         {
             InetAddress destination = iter.next();
             CompactEndpointSerializationHelper.serialize(destination, dos);
-            String id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout(), handler.consistencyLevel);
+            String id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout());
             dos.writeUTF(id);
         }
         message = message.withParameter(RowMutation.FORWARD_TO, bos.toByteArray());
         // send the combined message + forward headers
         Tracing.trace("Enqueuing message to {}", target);
-        MessagingService.instance().sendRR(message, target, handler, message.getTimeout(), handler.consistencyLevel);
+        MessagingService.instance().sendRR(message, target, handler);
     }
 
     private static void insertLocal(final RowMutation rm, final AbstractWriteResponseHandler responseHandler)