You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/04/09 04:37:57 UTC

[2/3] git commit: Add failure handler to async callback

Add failure handler to async callback

patch by sankalp kohli; reviewed by yukim for CASSANDRA-6747


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8a5b90ed
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8a5b90ed
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8a5b90ed

Branch: refs/heads/trunk
Commit: 8a5b90ede0701e05f776e6adccc71f6fbd43e3aa
Parents: 66b3b2b
Author: sankalp kohli <oh...@gmail.com>
Authored: Tue Apr 8 21:37:00 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Apr 8 21:37:00 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/net/CallbackInfo.java  | 15 ++++++++-
 .../net/IAsyncCallbackWithFailure.java          | 29 +++++++++++++++++
 .../cassandra/net/MessageDeliveryTask.java      | 16 +++++++++-
 .../org/apache/cassandra/net/MessageIn.java     | 10 ++++++
 .../apache/cassandra/net/MessagingService.java  | 33 +++++++++++++++-----
 .../cassandra/net/ResponseVerbHandler.java      | 12 +++++--
 .../apache/cassandra/repair/SnapshotTask.java   | 21 ++++++++-----
 .../cassandra/service/ActiveRepairService.java  | 23 +++++++++++---
 .../cassandra/service/StorageService.java       | 12 ++++++-
 10 files changed, 146 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a5b90ed/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f1e8b20..94dd7c3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -42,6 +42,7 @@
  * Lock counter cells, not partitions (CASSANDRA-6880)
  * Track presence of legacy counter shards in sstables (CASSANDRA-6888)
  * Ensure safe resource cleanup when replacing sstables (CASSANDRA-6912)
+ * Add failure handler to async callback (CASSANDRA-6747)
 Merged from 2.0:
  * Put nodes in hibernate when join_ring is false (CASSANDRA-6961)
  * Allow compaction of system tables during startup (CASSANDRA-6913)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a5b90ed/src/java/org/apache/cassandra/net/CallbackInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/CallbackInfo.java b/src/java/org/apache/cassandra/net/CallbackInfo.java
index 3e584b4..b61210c 100644
--- a/src/java/org/apache/cassandra/net/CallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/CallbackInfo.java
@@ -31,6 +31,12 @@ public class CallbackInfo
     protected final InetAddress target;
     protected final IAsyncCallback callback;
     protected final IVersionedSerializer<?> serializer;
+    private final boolean failureCallback;
+
+    public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?> serializer)
+    {
+        this(target, callback, serializer, false);
+    }
 
     /**
      * Create CallbackInfo without sent message
@@ -39,11 +45,12 @@ public class CallbackInfo
      * @param callback
      * @param serializer serializer to deserialize response message
      */
-    public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?> serializer)
+    public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?> serializer, boolean failureCallback)
     {
         this.target = target;
         this.callback = callback;
         this.serializer = serializer;
+        this.failureCallback = failureCallback;
     }
 
     public boolean shouldHint()
@@ -51,12 +58,18 @@ public class CallbackInfo
         return false;
     }
 
+    public boolean isFailureCallback()
+    {
+        return failureCallback;
+    }
+
     public String toString()
     {
         return "CallbackInfo(" +
                "target=" + target +
                ", callback=" + callback +
                ", serializer=" + serializer +
+               ", failureCallback=" + failureCallback +
                ')';
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a5b90ed/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java b/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java
new file mode 100644
index 0000000..744bb62
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+
+public interface IAsyncCallbackWithFailure<T> extends IAsyncCallback<T>
+{
+
+    /**
+     * Called when there is an exception on the remote node or timeout happens
+     */
+    public void onFailure(InetAddress from);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a5b90ed/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index e49b93c..387817b 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -57,7 +57,21 @@ public class MessageDeliveryTask implements Runnable
             return;
         }
 
-        verbHandler.doVerb(message, id);
+        try
+        {
+            verbHandler.doVerb(message, id);
+        }
+        catch (Throwable t)
+        {
+            if (message.isFailureCallback())
+            {
+                MessageOut response = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE)
+                                                    .withParameter(MessagingService.FAILURE_RESPONSE_PARAM, MessagingService.ONE_BYTE);
+                MessagingService.instance().sendReply(response, id, message.from);
+            }
+
+            throw t;
+        }
         if (GOSSIP_VERBS.contains(message.verb))
             Gossiper.instance.setLastProcessedMessageAt(constructionTime);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a5b90ed/src/java/org/apache/cassandra/net/MessageIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java
index e0efefe..b37ca2b 100644
--- a/src/java/org/apache/cassandra/net/MessageIn.java
+++ b/src/java/org/apache/cassandra/net/MessageIn.java
@@ -105,6 +105,16 @@ public class MessageIn<T>
         return MessagingService.verbStages.get(verb);
     }
 
+    public boolean isFailureCallback()
+    {
+        return parameters.containsKey(MessagingService.FAILURE_CALLBACK_PARAM);
+    }
+
+    public boolean isFailureResponse()
+    {
+        return parameters.containsKey(MessagingService.FAILURE_RESPONSE_PARAM);
+    }
+
     public long getTimeout()
     {
         return DatabaseDescriptor.getTimeout(verb);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a5b90ed/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index ab12020..6d9a1b5 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -75,6 +75,10 @@ public final class MessagingService implements MessagingServiceMBean
     public static final int VERSION_21 = 8;
     public static final int current_version = VERSION_21;
 
+    public static final String FAILURE_CALLBACK_PARAM = "CAL_BAC";
+    public static final byte[] ONE_BYTE = new byte[1];
+    public static final String FAILURE_RESPONSE_PARAM = "FAIL";
+
     /**
      * we preface every message with this number so the recipient can validate the sender is sane
      */
@@ -166,7 +170,6 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.MIGRATION_REQUEST, Stage.MIGRATION);
         put(Verb.INDEX_SCAN, Stage.READ);
         put(Verb.REPLICATION_FINISHED, Stage.MISC);
-        put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE);
         put(Verb.COUNTER_MUTATION, Stage.MUTATION);
         put(Verb.SNAPSHOT, Stage.MISC);
         put(Verb.ECHO, Stage.GOSSIP);
@@ -329,10 +332,19 @@ public final class MessagingService implements MessagingServiceMBean
         {
             public Object apply(Pair<Integer, ExpiringMap.CacheableObject<CallbackInfo>> pair)
             {
-                CallbackInfo expiredCallbackInfo = pair.right.value;
+                final CallbackInfo expiredCallbackInfo = pair.right.value;
                 maybeAddLatency(expiredCallbackInfo.callback, expiredCallbackInfo.target, pair.right.timeout);
                 ConnectionMetrics.totalTimeouts.mark();
                 getConnectionPool(expiredCallbackInfo.target).incrementTimeout();
+                if (expiredCallbackInfo.isFailureCallback())
+                {
+                    StageManager.getStage(Stage.INTERNAL_RESPONSE).submit(new Runnable() {
+                        @Override
+                        public void run() {
+                            ((IAsyncCallbackWithFailure)expiredCallbackInfo.callback).onFailure(expiredCallbackInfo.target);
+                        }
+                    });
+                }
 
                 if (expiredCallbackInfo.shouldHint())
                 {
@@ -537,11 +549,11 @@ public final class MessagingService implements MessagingServiceMBean
         return verbHandlers.get(type);
     }
 
-    public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress to, long timeout)
+    public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress to, long timeout, boolean failureCallback)
     {
         assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel
         int messageId = nextId();
-        CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
+        CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb), failureCallback), timeout);
         assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
         return messageId;
     }
@@ -564,7 +576,12 @@ public final class MessagingService implements MessagingServiceMBean
 
     public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb)
     {
-        return sendRR(message, to, cb, message.getTimeout());
+        return sendRR(message, to, cb, message.getTimeout(), false);
+    }
+
+    public int sendRRWithFailure(MessageOut message, InetAddress to, IAsyncCallbackWithFailure cb)
+    {
+        return sendRR(message, to, cb, message.getTimeout(), true);
     }
 
     /**
@@ -581,10 +598,10 @@ public final class MessagingService implements MessagingServiceMBean
      * @param timeout the timeout used for expiration
      * @return an reference to message id used to match with the result
      */
-    public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, long timeout)
+    public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, long timeout, boolean failureCallback)
     {
-        int id = addCallback(cb, message, to, timeout);
-        sendOneWay(message, id, to);
+        int id = addCallback(cb, message, to, timeout, failureCallback);
+        sendOneWay(failureCallback ? message.withParameter(FAILURE_CALLBACK_PARAM, ONE_BYTE) : message, id, to);
         return id;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a5b90ed/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index 132e574..1d9aa98 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -42,7 +42,15 @@ public class ResponseVerbHandler implements IVerbHandler
 
         Tracing.trace("Processing response from {}", message.from);
         IAsyncCallback cb = callbackInfo.callback;
-        MessagingService.instance().maybeAddLatency(cb, message.from, latency);
-        cb.response(message);
+        if (message.isFailureResponse())
+        {
+            ((IAsyncCallbackWithFailure) cb).onFailure(message.from);
+        }
+        else
+        {
+            //TODO: Should we add latency only in success cases?
+            MessagingService.instance().maybeAddLatency(cb, message.from, latency);
+            cb.response(message);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a5b90ed/src/java/org/apache/cassandra/repair/SnapshotTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SnapshotTask.java b/src/java/org/apache/cassandra/repair/SnapshotTask.java
index 1a9d324..cb5003a 100644
--- a/src/java/org/apache/cassandra/repair/SnapshotTask.java
+++ b/src/java/org/apache/cassandra/repair/SnapshotTask.java
@@ -24,7 +24,7 @@ import java.util.concurrent.RunnableFuture;
 import com.google.common.util.concurrent.AbstractFuture;
 
 import org.apache.cassandra.db.SnapshotCommand;
-import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 
@@ -44,18 +44,18 @@ public class SnapshotTask extends AbstractFuture<InetAddress> implements Runnabl
 
     public void run()
     {
-        MessagingService.instance().sendRR(new SnapshotCommand(desc.keyspace,
-                                                               desc.columnFamily,
-                                                               desc.sessionId.toString(),
-                                                               false).createMessage(),
-                                           endpoint,
-                                           new SnapshotCallback(this));
+        MessagingService.instance().sendRRWithFailure(new SnapshotCommand(desc.keyspace,
+                desc.columnFamily,
+                desc.sessionId.toString(),
+                false).createMessage(),
+                endpoint,
+                new SnapshotCallback(this));
     }
 
     /**
      * Callback for snapshot request. Run on INTERNAL_RESPONSE stage.
      */
-    static class SnapshotCallback implements IAsyncCallback
+    static class SnapshotCallback implements IAsyncCallbackWithFailure
     {
         final SnapshotTask task;
 
@@ -75,5 +75,10 @@ public class SnapshotTask extends AbstractFuture<InetAddress> implements Runnabl
         }
 
         public boolean isLatencyForSnitch() { return false; }
+
+        public void onFailure(InetAddress from)
+        {
+            task.setException(new RuntimeException("Could not create snapshot at " + from));
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a5b90ed/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 83d8727..7a7aac9 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -23,6 +23,7 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
@@ -43,7 +44,7 @@ import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
@@ -249,19 +250,24 @@ public class ActiveRepairService
         UUID parentRepairSession = UUIDGen.getTimeUUID();
         registerParentRepairSession(parentRepairSession, columnFamilyStores, ranges);
         final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size());
-        IAsyncCallback callback = new IAsyncCallback()
+        final AtomicBoolean status = new AtomicBoolean(true);
+        IAsyncCallbackWithFailure callback = new IAsyncCallbackWithFailure()
         {
-            @Override
             public void response(MessageIn msg)
             {
                 prepareLatch.countDown();
             }
 
-            @Override
             public boolean isLatencyForSnitch()
             {
                 return false;
             }
+
+            public void onFailure(InetAddress from)
+            {
+                status.set(false);
+                prepareLatch.countDown();
+            }
         };
 
         List<UUID> cfIds = new ArrayList<>(columnFamilyStores.size());
@@ -272,7 +278,7 @@ public class ActiveRepairService
         {
             PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, ranges);
             MessageOut<RepairMessage> msg = message.createMessage();
-            MessagingService.instance().sendRR(msg, neighbour, callback);
+            MessagingService.instance().sendRRWithFailure(msg, neighbour, callback);
         }
         try
         {
@@ -283,6 +289,13 @@ public class ActiveRepairService
             parentRepairSessions.remove(parentRepairSession);
             throw new RuntimeException("Did not get replies from all endpoints.", e);
         }
+
+        if (!status.get())
+        {
+            parentRepairSessions.remove(parentRepairSession);
+            throw new RuntimeException("Did not get positive replies from all endpoints.");
+        }
+
         return parentRepairSession;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a5b90ed/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 9f8158f..3e94172 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2642,7 +2642,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
                 UUID parentSession = null;
                 if (!fullRepair)
-                    parentSession = ActiveRepairService.instance.prepareForRepair(allNeighbors, ranges, columnFamilyStores);
+                {
+                    try
+                    {
+                        parentSession = ActiveRepairService.instance.prepareForRepair(allNeighbors, ranges, columnFamilyStores);
+                    }
+                    catch (Throwable t)
+                    {
+                        sendNotification("repair", String.format("Repair failed with error %s", t.getMessage()), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
+                        return;
+                    }
+                }
 
                 List<RepairFuture> futures = new ArrayList<>(ranges.size());
                 for (Range<Token> range : ranges)