You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/07/29 23:10:44 UTC

[1/5] git commit: Handle KeyboardInterrupt properly in cqlsh

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 b44bbb83a -> 7e1adb497
  refs/heads/cassandra-2.1.0 6843b556f -> 4669ab969


Handle KeyboardInterrupt properly in cqlsh

Patch by Jordon Pittier, reviewed by brandonwilliams for CASSANDRA-5481


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/249bbfc3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/249bbfc3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/249bbfc3

Branch: refs/heads/cassandra-2.1.0
Commit: 249bbfc3b205d934764e5b05e955378f78c28480
Parents: 192596a
Author: Brandon Williams <br...@apache.org>
Authored: Tue Jul 29 13:57:42 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Jul 29 13:57:42 2014 -0500

----------------------------------------------------------------------
 bin/cqlsh | 20 ++++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/249bbfc3/bin/cqlsh
----------------------------------------------------------------------
diff --git a/bin/cqlsh b/bin/cqlsh
index bb0ffdf..d2a503d 100755
--- a/bin/cqlsh
+++ b/bin/cqlsh
@@ -504,15 +504,9 @@ class Shell(cmd.Cmd):
             self.conn = use_conn
         else:
             transport = transport_factory(hostname, port, os.environ, CONFIG_FILE)
-            self.conn = cql.connect(hostname, port, user=username, password=password,
-                                    cql_version=cqlver, transport=transport)
+            self.conn = cql.connect(hostname, port, keyspace=keyspace, user=username, 
+                                    password=password, cql_version=cqlver, transport=transport)
         self.set_expanded_cql_version(cqlver)
-        # we could set the keyspace through cql.connect(), but as of 1.0.10,
-        # it doesn't quote the keyspace for USE :(
-        if keyspace is not None:
-            tempcurs = self.conn.cursor()
-            tempcurs.execute('USE %s;' % self.cql_protect_name(keyspace))
-            tempcurs.close()
         self.cursor = self.conn.cursor()
         self.get_connection_versions()
 
@@ -1056,6 +1050,16 @@ class Shell(cmd.Cmd):
             except CQL_ERRORS, err:
                 self.printerr(str(err))
                 return False
+            except KeyboardInterrupt:
+                self.cursor.close()
+                self.conn.terminate_connection()
+                transport = self.transport_factory(self.hostname, self.port,
+                                                   os.environ, CONFIG_FILE)
+                self.conn = cql.connect(self.hostname, self.port, keyspace=self.keyspace,
+                                        user=self.username, password=self.password,
+                                        cql_version=self.cql_version, transport=transport)
+                self.cursor = self.conn.cursor()
+                return False                
             except Exception, err:
                 import traceback
                 self.printerr(traceback.format_exc())


[5/5] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1.0

Posted by yu...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4669ab96
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4669ab96
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4669ab96

Branch: refs/heads/cassandra-2.1.0
Commit: 4669ab969bfabebf5731fcb2db9a80f442a7c8eb
Parents: 6843b55 7e1adb4
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Jul 29 16:05:40 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Jul 29 16:05:40 2014 -0500

----------------------------------------------------------------------
 src/java/org/apache/cassandra/net/MessageDeliveryTask.java | 2 +-
 src/java/org/apache/cassandra/net/MessageIn.java           | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------



[4/5] git commit: Backport CASSANDRA-6747

Posted by yu...@apache.org.
Backport CASSANDRA-6747

patch by yukim; reviewed by krummas for CASSANDRA-7560


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7e1adb49
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7e1adb49
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7e1adb49

Branch: refs/heads/cassandra-2.0
Commit: 7e1adb4976470b48a361ec6dcca7cbbcdb86d85f
Parents: b44bbb8
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Jul 29 16:02:38 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Jul 29 16:02:38 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/net/CallbackInfo.java  | 15 ++++++++-
 .../net/IAsyncCallbackWithFailure.java          | 28 ++++++++++++++++
 .../cassandra/net/MessageDeliveryTask.java      | 16 ++++++++-
 .../org/apache/cassandra/net/MessageIn.java     | 10 ++++++
 .../apache/cassandra/net/MessagingService.java  | 34 +++++++++++++++-----
 .../cassandra/net/ResponseVerbHandler.java      | 12 +++++--
 .../apache/cassandra/repair/SnapshotTask.java   | 21 +++++++-----
 8 files changed, 117 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 26d94f7..7f7d2bc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -19,6 +19,7 @@
  * Add inter_dc_stream_throughput_outbound_megabits_per_sec (CASSANDRA-6596)
  * Add option to disable STCS in L0 (CASSANDRA-6621)
  * Fix error when doing reversed queries with static columns (CASSANDRA-7490)
+ * Backport CASSANDRA-6747 (CASSANDRA-7560)
 Merged from 1.2:
  * Set correct stream ID on responses when non-Exception Throwables
    are thrown while handling native protocol messages (CASSANDRA-7470)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/net/CallbackInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/CallbackInfo.java b/src/java/org/apache/cassandra/net/CallbackInfo.java
index 3e584b4..b61210c 100644
--- a/src/java/org/apache/cassandra/net/CallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/CallbackInfo.java
@@ -31,6 +31,12 @@ public class CallbackInfo
     protected final InetAddress target;
     protected final IAsyncCallback callback;
     protected final IVersionedSerializer<?> serializer;
+    private final boolean failureCallback;
+
+    public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?> serializer)
+    {
+        this(target, callback, serializer, false);
+    }
 
     /**
      * Create CallbackInfo without sent message
@@ -39,11 +45,12 @@ public class CallbackInfo
      * @param callback
      * @param serializer serializer to deserialize response message
      */
-    public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?> serializer)
+    public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?> serializer, boolean failureCallback)
     {
         this.target = target;
         this.callback = callback;
         this.serializer = serializer;
+        this.failureCallback = failureCallback;
     }
 
     public boolean shouldHint()
@@ -51,12 +58,18 @@ public class CallbackInfo
         return false;
     }
 
+    public boolean isFailureCallback()
+    {
+        return failureCallback;
+    }
+
     public String toString()
     {
         return "CallbackInfo(" +
                "target=" + target +
                ", callback=" + callback +
                ", serializer=" + serializer +
+               ", failureCallback=" + failureCallback +
                ')';
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java b/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java
new file mode 100644
index 0000000..1f95579
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+
+public interface IAsyncCallbackWithFailure<T> extends IAsyncCallback<T>
+{
+    /**
+     * Called when there is an exception on the remote node or timeout happens
+     */
+    public void onFailure(InetAddress from);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index e49b93c..982f17e 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -57,7 +57,21 @@ public class MessageDeliveryTask implements Runnable
             return;
         }
 
-        verbHandler.doVerb(message, id);
+        try
+        {
+            verbHandler.doVerb(message, id);
+        }
+        catch (Throwable t)
+        {
+            if (message.doCallbackOnFailure())
+            {
+                MessageOut response = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE)
+                                                    .withParameter(MessagingService.FAILURE_RESPONSE_PARAM, MessagingService.ONE_BYTE);
+                MessagingService.instance().sendReply(response, id, message.from);
+            }
+
+            throw t;
+        }
         if (GOSSIP_VERBS.contains(message.verb))
             Gossiper.instance.setLastProcessedMessageAt(constructionTime);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/net/MessageIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java
index e0efefe..10260c2 100644
--- a/src/java/org/apache/cassandra/net/MessageIn.java
+++ b/src/java/org/apache/cassandra/net/MessageIn.java
@@ -105,6 +105,16 @@ public class MessageIn<T>
         return MessagingService.verbStages.get(verb);
     }
 
+    public boolean doCallbackOnFailure()
+    {
+        return parameters.containsKey(MessagingService.FAILURE_CALLBACK_PARAM);
+    }
+
+    public boolean isFailureResponse()
+    {
+        return parameters.containsKey(MessagingService.FAILURE_RESPONSE_PARAM);
+    }
+
     public long getTimeout()
     {
         return DatabaseDescriptor.getTimeout(verb);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 4a5df29..0bb1b17 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -75,6 +75,10 @@ public final class MessagingService implements MessagingServiceMBean
 
     public boolean allNodesAtLeast20 = true;
 
+    public static final String FAILURE_CALLBACK_PARAM = "CAL_BAC";
+    public static final byte[] ONE_BYTE = new byte[1];
+    public static final String FAILURE_RESPONSE_PARAM = "FAIL";
+
     /**
      * we preface every message with this number so the recipient can validate the sender is sane
      */
@@ -166,7 +170,6 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.MIGRATION_REQUEST, Stage.MIGRATION);
         put(Verb.INDEX_SCAN, Stage.READ);
         put(Verb.REPLICATION_FINISHED, Stage.MISC);
-        put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE);
         put(Verb.COUNTER_MUTATION, Stage.MUTATION);
         put(Verb.SNAPSHOT, Stage.MISC);
         put(Verb.ECHO, Stage.GOSSIP);
@@ -329,10 +332,19 @@ public final class MessagingService implements MessagingServiceMBean
         {
             public Object apply(Pair<Integer, ExpiringMap.CacheableObject<CallbackInfo>> pair)
             {
-                CallbackInfo expiredCallbackInfo = pair.right.value;
+                final CallbackInfo expiredCallbackInfo = pair.right.value;
                 maybeAddLatency(expiredCallbackInfo.callback, expiredCallbackInfo.target, pair.right.timeout);
                 ConnectionMetrics.totalTimeouts.mark();
                 getConnectionPool(expiredCallbackInfo.target).incrementTimeout();
+                if (expiredCallbackInfo.isFailureCallback())
+                {
+                    StageManager.getStage(Stage.INTERNAL_RESPONSE).submit(new Runnable() {
+                        @Override
+                        public void run() {
+                            ((IAsyncCallbackWithFailure)expiredCallbackInfo.callback).onFailure(expiredCallbackInfo.target);
+                        }
+                    });
+                }
 
                 if (expiredCallbackInfo.shouldHint())
                 {
@@ -537,11 +549,11 @@ public final class MessagingService implements MessagingServiceMBean
         return verbHandlers.get(type);
     }
 
-    public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress to, long timeout)
+    public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress to, long timeout, boolean failureCallback)
     {
         assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel
         int messageId = nextId();
-        CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
+        CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb), failureCallback), timeout);
         assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
         return messageId;
     }
@@ -576,7 +588,12 @@ public final class MessagingService implements MessagingServiceMBean
 
     public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb)
     {
-        return sendRR(message, to, cb, message.getTimeout());
+        return sendRR(message, to, cb, message.getTimeout(), false);
+    }
+
+    public int sendRRWithFailure(MessageOut message, InetAddress to, IAsyncCallbackWithFailure cb)
+    {
+        return sendRR(message, to, cb, message.getTimeout(), true);
     }
 
     /**
@@ -588,12 +605,13 @@ public final class MessagingService implements MessagingServiceMBean
      * @param cb      callback interface which is used to pass the responses or
      *                suggest that a timeout occurred to the invoker of the send().
      * @param timeout the timeout used for expiration
+     * @param failureCallback true if given cb has failure callback
      * @return an reference to message id used to match with the result
      */
-    public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, long timeout)
+    public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, long timeout, boolean failureCallback)
     {
-        int id = addCallback(cb, message, to, timeout);
-        sendOneWay(message, id, to);
+        int id = addCallback(cb, message, to, timeout, failureCallback);
+        sendOneWay(failureCallback ? message.withParameter(FAILURE_CALLBACK_PARAM, ONE_BYTE) : message, id, to);
         return id;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index 132e574..1d9aa98 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -42,7 +42,15 @@ public class ResponseVerbHandler implements IVerbHandler
 
         Tracing.trace("Processing response from {}", message.from);
         IAsyncCallback cb = callbackInfo.callback;
-        MessagingService.instance().maybeAddLatency(cb, message.from, latency);
-        cb.response(message);
+        if (message.isFailureResponse())
+        {
+            ((IAsyncCallbackWithFailure) cb).onFailure(message.from);
+        }
+        else
+        {
+            //TODO: Should we add latency only in success cases?
+            MessagingService.instance().maybeAddLatency(cb, message.from, latency);
+            cb.response(message);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/repair/SnapshotTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SnapshotTask.java b/src/java/org/apache/cassandra/repair/SnapshotTask.java
index 1a9d324..09e8104 100644
--- a/src/java/org/apache/cassandra/repair/SnapshotTask.java
+++ b/src/java/org/apache/cassandra/repair/SnapshotTask.java
@@ -24,7 +24,7 @@ import java.util.concurrent.RunnableFuture;
 import com.google.common.util.concurrent.AbstractFuture;
 
 import org.apache.cassandra.db.SnapshotCommand;
-import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 
@@ -44,18 +44,18 @@ public class SnapshotTask extends AbstractFuture<InetAddress> implements Runnabl
 
     public void run()
     {
-        MessagingService.instance().sendRR(new SnapshotCommand(desc.keyspace,
-                                                               desc.columnFamily,
-                                                               desc.sessionId.toString(),
-                                                               false).createMessage(),
-                                           endpoint,
-                                           new SnapshotCallback(this));
+        MessagingService.instance().sendRRWithFailure(new SnapshotCommand(desc.keyspace,
+                                                                          desc.columnFamily,
+                                                                          desc.sessionId.toString(),
+                                                                          false).createMessage(),
+                                                      endpoint,
+                                                      new SnapshotCallback(this));
     }
 
     /**
      * Callback for snapshot request. Run on INTERNAL_RESPONSE stage.
      */
-    static class SnapshotCallback implements IAsyncCallback
+    static class SnapshotCallback implements IAsyncCallbackWithFailure
     {
         final SnapshotTask task;
 
@@ -75,5 +75,10 @@ public class SnapshotTask extends AbstractFuture<InetAddress> implements Runnabl
         }
 
         public boolean isLatencyForSnitch() { return false; }
+
+        public void onFailure(InetAddress from)
+        {
+            task.setException(new RuntimeException("Could not create snapshot at " + from));
+        }
     }
 }


[2/5] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0

Posted by yu...@apache.org.
Merge branch 'cassandra-1.2' into cassandra-2.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b44bbb83
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b44bbb83
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b44bbb83

Branch: refs/heads/cassandra-2.1.0
Commit: b44bbb83aa997a583dbbdeb7b1375e999fac6dda
Parents: a5cc16d 249bbfc
Author: Brandon Williams <br...@apache.org>
Authored: Tue Jul 29 13:58:27 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Jul 29 13:58:27 2014 -0500

----------------------------------------------------------------------
 bin/cqlsh | 20 ++++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b44bbb83/bin/cqlsh
----------------------------------------------------------------------


[3/5] git commit: Backport CASSANDRA-6747

Posted by yu...@apache.org.
Backport CASSANDRA-6747

patch by yukim; reviewed by krummas for CASSANDRA-7560


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7e1adb49
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7e1adb49
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7e1adb49

Branch: refs/heads/cassandra-2.1.0
Commit: 7e1adb4976470b48a361ec6dcca7cbbcdb86d85f
Parents: b44bbb8
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Jul 29 16:02:38 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Jul 29 16:02:38 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/net/CallbackInfo.java  | 15 ++++++++-
 .../net/IAsyncCallbackWithFailure.java          | 28 ++++++++++++++++
 .../cassandra/net/MessageDeliveryTask.java      | 16 ++++++++-
 .../org/apache/cassandra/net/MessageIn.java     | 10 ++++++
 .../apache/cassandra/net/MessagingService.java  | 34 +++++++++++++++-----
 .../cassandra/net/ResponseVerbHandler.java      | 12 +++++--
 .../apache/cassandra/repair/SnapshotTask.java   | 21 +++++++-----
 8 files changed, 117 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 26d94f7..7f7d2bc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -19,6 +19,7 @@
  * Add inter_dc_stream_throughput_outbound_megabits_per_sec (CASSANDRA-6596)
  * Add option to disable STCS in L0 (CASSANDRA-6621)
  * Fix error when doing reversed queries with static columns (CASSANDRA-7490)
+ * Backport CASSANDRA-6747 (CASSANDRA-7560)
 Merged from 1.2:
  * Set correct stream ID on responses when non-Exception Throwables
    are thrown while handling native protocol messages (CASSANDRA-7470)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/net/CallbackInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/CallbackInfo.java b/src/java/org/apache/cassandra/net/CallbackInfo.java
index 3e584b4..b61210c 100644
--- a/src/java/org/apache/cassandra/net/CallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/CallbackInfo.java
@@ -31,6 +31,12 @@ public class CallbackInfo
     protected final InetAddress target;
     protected final IAsyncCallback callback;
     protected final IVersionedSerializer<?> serializer;
+    private final boolean failureCallback;
+
+    public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?> serializer)
+    {
+        this(target, callback, serializer, false);
+    }
 
     /**
      * Create CallbackInfo without sent message
@@ -39,11 +45,12 @@ public class CallbackInfo
      * @param callback
      * @param serializer serializer to deserialize response message
      */
-    public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?> serializer)
+    public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?> serializer, boolean failureCallback)
     {
         this.target = target;
         this.callback = callback;
         this.serializer = serializer;
+        this.failureCallback = failureCallback;
     }
 
     public boolean shouldHint()
@@ -51,12 +58,18 @@ public class CallbackInfo
         return false;
     }
 
+    public boolean isFailureCallback()
+    {
+        return failureCallback;
+    }
+
     public String toString()
     {
         return "CallbackInfo(" +
                "target=" + target +
                ", callback=" + callback +
                ", serializer=" + serializer +
+               ", failureCallback=" + failureCallback +
                ')';
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java b/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java
new file mode 100644
index 0000000..1f95579
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+
+public interface IAsyncCallbackWithFailure<T> extends IAsyncCallback<T>
+{
+    /**
+     * Called when there is an exception on the remote node or timeout happens
+     */
+    public void onFailure(InetAddress from);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index e49b93c..982f17e 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -57,7 +57,21 @@ public class MessageDeliveryTask implements Runnable
             return;
         }
 
-        verbHandler.doVerb(message, id);
+        try
+        {
+            verbHandler.doVerb(message, id);
+        }
+        catch (Throwable t)
+        {
+            if (message.doCallbackOnFailure())
+            {
+                MessageOut response = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE)
+                                                    .withParameter(MessagingService.FAILURE_RESPONSE_PARAM, MessagingService.ONE_BYTE);
+                MessagingService.instance().sendReply(response, id, message.from);
+            }
+
+            throw t;
+        }
         if (GOSSIP_VERBS.contains(message.verb))
             Gossiper.instance.setLastProcessedMessageAt(constructionTime);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/net/MessageIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java
index e0efefe..10260c2 100644
--- a/src/java/org/apache/cassandra/net/MessageIn.java
+++ b/src/java/org/apache/cassandra/net/MessageIn.java
@@ -105,6 +105,16 @@ public class MessageIn<T>
         return MessagingService.verbStages.get(verb);
     }
 
+    public boolean doCallbackOnFailure()
+    {
+        return parameters.containsKey(MessagingService.FAILURE_CALLBACK_PARAM);
+    }
+
+    public boolean isFailureResponse()
+    {
+        return parameters.containsKey(MessagingService.FAILURE_RESPONSE_PARAM);
+    }
+
     public long getTimeout()
     {
         return DatabaseDescriptor.getTimeout(verb);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 4a5df29..0bb1b17 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -75,6 +75,10 @@ public final class MessagingService implements MessagingServiceMBean
 
     public boolean allNodesAtLeast20 = true;
 
+    public static final String FAILURE_CALLBACK_PARAM = "CAL_BAC";
+    public static final byte[] ONE_BYTE = new byte[1];
+    public static final String FAILURE_RESPONSE_PARAM = "FAIL";
+
     /**
      * we preface every message with this number so the recipient can validate the sender is sane
      */
@@ -166,7 +170,6 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.MIGRATION_REQUEST, Stage.MIGRATION);
         put(Verb.INDEX_SCAN, Stage.READ);
         put(Verb.REPLICATION_FINISHED, Stage.MISC);
-        put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE);
         put(Verb.COUNTER_MUTATION, Stage.MUTATION);
         put(Verb.SNAPSHOT, Stage.MISC);
         put(Verb.ECHO, Stage.GOSSIP);
@@ -329,10 +332,19 @@ public final class MessagingService implements MessagingServiceMBean
         {
             public Object apply(Pair<Integer, ExpiringMap.CacheableObject<CallbackInfo>> pair)
             {
-                CallbackInfo expiredCallbackInfo = pair.right.value;
+                final CallbackInfo expiredCallbackInfo = pair.right.value;
                 maybeAddLatency(expiredCallbackInfo.callback, expiredCallbackInfo.target, pair.right.timeout);
                 ConnectionMetrics.totalTimeouts.mark();
                 getConnectionPool(expiredCallbackInfo.target).incrementTimeout();
+                if (expiredCallbackInfo.isFailureCallback())
+                {
+                    StageManager.getStage(Stage.INTERNAL_RESPONSE).submit(new Runnable() {
+                        @Override
+                        public void run() {
+                            ((IAsyncCallbackWithFailure)expiredCallbackInfo.callback).onFailure(expiredCallbackInfo.target);
+                        }
+                    });
+                }
 
                 if (expiredCallbackInfo.shouldHint())
                 {
@@ -537,11 +549,11 @@ public final class MessagingService implements MessagingServiceMBean
         return verbHandlers.get(type);
     }
 
-    public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress to, long timeout)
+    public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress to, long timeout, boolean failureCallback)
     {
         assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel
         int messageId = nextId();
-        CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
+        CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb), failureCallback), timeout);
         assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
         return messageId;
     }
@@ -576,7 +588,12 @@ public final class MessagingService implements MessagingServiceMBean
 
     public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb)
     {
-        return sendRR(message, to, cb, message.getTimeout());
+        return sendRR(message, to, cb, message.getTimeout(), false);
+    }
+
+    public int sendRRWithFailure(MessageOut message, InetAddress to, IAsyncCallbackWithFailure cb)
+    {
+        return sendRR(message, to, cb, message.getTimeout(), true);
     }
 
     /**
@@ -588,12 +605,13 @@ public final class MessagingService implements MessagingServiceMBean
      * @param cb      callback interface which is used to pass the responses or
      *                suggest that a timeout occurred to the invoker of the send().
      * @param timeout the timeout used for expiration
+     * @param failureCallback true if given cb has failure callback
      * @return an reference to message id used to match with the result
      */
-    public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, long timeout)
+    public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, long timeout, boolean failureCallback)
     {
-        int id = addCallback(cb, message, to, timeout);
-        sendOneWay(message, id, to);
+        int id = addCallback(cb, message, to, timeout, failureCallback);
+        sendOneWay(failureCallback ? message.withParameter(FAILURE_CALLBACK_PARAM, ONE_BYTE) : message, id, to);
         return id;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index 132e574..1d9aa98 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -42,7 +42,15 @@ public class ResponseVerbHandler implements IVerbHandler
 
         Tracing.trace("Processing response from {}", message.from);
         IAsyncCallback cb = callbackInfo.callback;
-        MessagingService.instance().maybeAddLatency(cb, message.from, latency);
-        cb.response(message);
+        if (message.isFailureResponse())
+        {
+            ((IAsyncCallbackWithFailure) cb).onFailure(message.from);
+        }
+        else
+        {
+            //TODO: Should we add latency only in success cases?
+            MessagingService.instance().maybeAddLatency(cb, message.from, latency);
+            cb.response(message);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/repair/SnapshotTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SnapshotTask.java b/src/java/org/apache/cassandra/repair/SnapshotTask.java
index 1a9d324..09e8104 100644
--- a/src/java/org/apache/cassandra/repair/SnapshotTask.java
+++ b/src/java/org/apache/cassandra/repair/SnapshotTask.java
@@ -24,7 +24,7 @@ import java.util.concurrent.RunnableFuture;
 import com.google.common.util.concurrent.AbstractFuture;
 
 import org.apache.cassandra.db.SnapshotCommand;
-import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 
@@ -44,18 +44,18 @@ public class SnapshotTask extends AbstractFuture<InetAddress> implements Runnabl
 
     public void run()
     {
-        MessagingService.instance().sendRR(new SnapshotCommand(desc.keyspace,
-                                                               desc.columnFamily,
-                                                               desc.sessionId.toString(),
-                                                               false).createMessage(),
-                                           endpoint,
-                                           new SnapshotCallback(this));
+        MessagingService.instance().sendRRWithFailure(new SnapshotCommand(desc.keyspace,
+                                                                          desc.columnFamily,
+                                                                          desc.sessionId.toString(),
+                                                                          false).createMessage(),
+                                                      endpoint,
+                                                      new SnapshotCallback(this));
     }
 
     /**
      * Callback for snapshot request. Run on INTERNAL_RESPONSE stage.
      */
-    static class SnapshotCallback implements IAsyncCallback
+    static class SnapshotCallback implements IAsyncCallbackWithFailure
     {
         final SnapshotTask task;
 
@@ -75,5 +75,10 @@ public class SnapshotTask extends AbstractFuture<InetAddress> implements Runnabl
         }
 
         public boolean isLatencyForSnitch() { return false; }
+
+        public void onFailure(InetAddress from)
+        {
+            task.setException(new RuntimeException("Could not create snapshot at " + from));
+        }
     }
 }