You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2014/07/30 19:00:19 UTC
[02/17] git commit: Backport CASSANDRA-6747
Backport CASSANDRA-6747
patch by yukim; reviewed by krummas for CASSANDRA-7560
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7e1adb49
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7e1adb49
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7e1adb49
Branch: refs/heads/trunk
Commit: 7e1adb4976470b48a361ec6dcca7cbbcdb86d85f
Parents: b44bbb8
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Jul 29 16:02:38 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Jul 29 16:02:38 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/net/CallbackInfo.java | 15 ++++++++-
.../net/IAsyncCallbackWithFailure.java | 28 ++++++++++++++++
.../cassandra/net/MessageDeliveryTask.java | 16 ++++++++-
.../org/apache/cassandra/net/MessageIn.java | 10 ++++++
.../apache/cassandra/net/MessagingService.java | 34 +++++++++++++++-----
.../cassandra/net/ResponseVerbHandler.java | 12 +++++--
.../apache/cassandra/repair/SnapshotTask.java | 21 +++++++-----
8 files changed, 117 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 26d94f7..7f7d2bc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -19,6 +19,7 @@
* Add inter_dc_stream_throughput_outbound_megabits_per_sec (CASSANDRA-6596)
* Add option to disable STCS in L0 (CASSANDRA-6621)
* Fix error when doing reversed queries with static columns (CASSANDRA-7490)
+ * Backport CASSANDRA-6747 (CASSANDRA-7560)
Merged from 1.2:
* Set correct stream ID on responses when non-Exception Throwables
are thrown while handling native protocol messages (CASSANDRA-7470)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/net/CallbackInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/CallbackInfo.java b/src/java/org/apache/cassandra/net/CallbackInfo.java
index 3e584b4..b61210c 100644
--- a/src/java/org/apache/cassandra/net/CallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/CallbackInfo.java
@@ -31,6 +31,12 @@ public class CallbackInfo
protected final InetAddress target;
protected final IAsyncCallback callback;
protected final IVersionedSerializer<?> serializer;
+ private final boolean failureCallback;
+
+ public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?> serializer)
+ {
+ this(target, callback, serializer, false);
+ }
/**
* Create CallbackInfo without sent message
@@ -39,11 +45,12 @@ public class CallbackInfo
* @param callback
* @param serializer serializer to deserialize response message
*/
- public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?> serializer)
+ public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?> serializer, boolean failureCallback)
{
this.target = target;
this.callback = callback;
this.serializer = serializer;
+ this.failureCallback = failureCallback;
}
public boolean shouldHint()
@@ -51,12 +58,18 @@ public class CallbackInfo
return false;
}
+ public boolean isFailureCallback()
+ {
+ return failureCallback;
+ }
+
public String toString()
{
return "CallbackInfo(" +
"target=" + target +
", callback=" + callback +
", serializer=" + serializer +
+ ", failureCallback=" + failureCallback +
')';
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java b/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java
new file mode 100644
index 0000000..1f95579
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+
+public interface IAsyncCallbackWithFailure<T> extends IAsyncCallback<T>
+{
+ /**
+ * Called when there is an exception on the remote node or timeout happens
+ */
+ public void onFailure(InetAddress from);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index e49b93c..982f17e 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -57,7 +57,21 @@ public class MessageDeliveryTask implements Runnable
return;
}
- verbHandler.doVerb(message, id);
+ try
+ {
+ verbHandler.doVerb(message, id);
+ }
+ catch (Throwable t)
+ {
+ if (message.doCallbackOnFailure())
+ {
+ MessageOut response = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE)
+ .withParameter(MessagingService.FAILURE_RESPONSE_PARAM, MessagingService.ONE_BYTE);
+ MessagingService.instance().sendReply(response, id, message.from);
+ }
+
+ throw t;
+ }
if (GOSSIP_VERBS.contains(message.verb))
Gossiper.instance.setLastProcessedMessageAt(constructionTime);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/net/MessageIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java
index e0efefe..10260c2 100644
--- a/src/java/org/apache/cassandra/net/MessageIn.java
+++ b/src/java/org/apache/cassandra/net/MessageIn.java
@@ -105,6 +105,16 @@ public class MessageIn<T>
return MessagingService.verbStages.get(verb);
}
+ public boolean doCallbackOnFailure()
+ {
+ return parameters.containsKey(MessagingService.FAILURE_CALLBACK_PARAM);
+ }
+
+ public boolean isFailureResponse()
+ {
+ return parameters.containsKey(MessagingService.FAILURE_RESPONSE_PARAM);
+ }
+
public long getTimeout()
{
return DatabaseDescriptor.getTimeout(verb);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 4a5df29..0bb1b17 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -75,6 +75,10 @@ public final class MessagingService implements MessagingServiceMBean
public boolean allNodesAtLeast20 = true;
+ public static final String FAILURE_CALLBACK_PARAM = "CAL_BAC";
+ public static final byte[] ONE_BYTE = new byte[1];
+ public static final String FAILURE_RESPONSE_PARAM = "FAIL";
+
/**
* we preface every message with this number so the recipient can validate the sender is sane
*/
@@ -166,7 +170,6 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.MIGRATION_REQUEST, Stage.MIGRATION);
put(Verb.INDEX_SCAN, Stage.READ);
put(Verb.REPLICATION_FINISHED, Stage.MISC);
- put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE);
put(Verb.COUNTER_MUTATION, Stage.MUTATION);
put(Verb.SNAPSHOT, Stage.MISC);
put(Verb.ECHO, Stage.GOSSIP);
@@ -329,10 +332,19 @@ public final class MessagingService implements MessagingServiceMBean
{
public Object apply(Pair<Integer, ExpiringMap.CacheableObject<CallbackInfo>> pair)
{
- CallbackInfo expiredCallbackInfo = pair.right.value;
+ final CallbackInfo expiredCallbackInfo = pair.right.value;
maybeAddLatency(expiredCallbackInfo.callback, expiredCallbackInfo.target, pair.right.timeout);
ConnectionMetrics.totalTimeouts.mark();
getConnectionPool(expiredCallbackInfo.target).incrementTimeout();
+ if (expiredCallbackInfo.isFailureCallback())
+ {
+ StageManager.getStage(Stage.INTERNAL_RESPONSE).submit(new Runnable() {
+ @Override
+ public void run() {
+ ((IAsyncCallbackWithFailure)expiredCallbackInfo.callback).onFailure(expiredCallbackInfo.target);
+ }
+ });
+ }
if (expiredCallbackInfo.shouldHint())
{
@@ -537,11 +549,11 @@ public final class MessagingService implements MessagingServiceMBean
return verbHandlers.get(type);
}
- public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress to, long timeout)
+ public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress to, long timeout, boolean failureCallback)
{
assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel
int messageId = nextId();
- CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
+ CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb), failureCallback), timeout);
assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
return messageId;
}
@@ -576,7 +588,12 @@ public final class MessagingService implements MessagingServiceMBean
public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb)
{
- return sendRR(message, to, cb, message.getTimeout());
+ return sendRR(message, to, cb, message.getTimeout(), false);
+ }
+
+ public int sendRRWithFailure(MessageOut message, InetAddress to, IAsyncCallbackWithFailure cb)
+ {
+ return sendRR(message, to, cb, message.getTimeout(), true);
}
/**
@@ -588,12 +605,13 @@ public final class MessagingService implements MessagingServiceMBean
* @param cb callback interface which is used to pass the responses or
* suggest that a timeout occurred to the invoker of the send().
* @param timeout the timeout used for expiration
+ * @param failureCallback true if given cb has failure callback
* @return an reference to message id used to match with the result
*/
- public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, long timeout)
+ public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, long timeout, boolean failureCallback)
{
- int id = addCallback(cb, message, to, timeout);
- sendOneWay(message, id, to);
+ int id = addCallback(cb, message, to, timeout, failureCallback);
+ sendOneWay(failureCallback ? message.withParameter(FAILURE_CALLBACK_PARAM, ONE_BYTE) : message, id, to);
return id;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index 132e574..1d9aa98 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -42,7 +42,15 @@ public class ResponseVerbHandler implements IVerbHandler
Tracing.trace("Processing response from {}", message.from);
IAsyncCallback cb = callbackInfo.callback;
- MessagingService.instance().maybeAddLatency(cb, message.from, latency);
- cb.response(message);
+ if (message.isFailureResponse())
+ {
+ ((IAsyncCallbackWithFailure) cb).onFailure(message.from);
+ }
+ else
+ {
+ //TODO: Should we add latency only in success cases?
+ MessagingService.instance().maybeAddLatency(cb, message.from, latency);
+ cb.response(message);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/repair/SnapshotTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SnapshotTask.java b/src/java/org/apache/cassandra/repair/SnapshotTask.java
index 1a9d324..09e8104 100644
--- a/src/java/org/apache/cassandra/repair/SnapshotTask.java
+++ b/src/java/org/apache/cassandra/repair/SnapshotTask.java
@@ -24,7 +24,7 @@ import java.util.concurrent.RunnableFuture;
import com.google.common.util.concurrent.AbstractFuture;
import org.apache.cassandra.db.SnapshotCommand;
-import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
@@ -44,18 +44,18 @@ public class SnapshotTask extends AbstractFuture<InetAddress> implements Runnabl
public void run()
{
- MessagingService.instance().sendRR(new SnapshotCommand(desc.keyspace,
- desc.columnFamily,
- desc.sessionId.toString(),
- false).createMessage(),
- endpoint,
- new SnapshotCallback(this));
+ MessagingService.instance().sendRRWithFailure(new SnapshotCommand(desc.keyspace,
+ desc.columnFamily,
+ desc.sessionId.toString(),
+ false).createMessage(),
+ endpoint,
+ new SnapshotCallback(this));
}
/**
* Callback for snapshot request. Run on INTERNAL_RESPONSE stage.
*/
- static class SnapshotCallback implements IAsyncCallback
+ static class SnapshotCallback implements IAsyncCallbackWithFailure
{
final SnapshotTask task;
@@ -75,5 +75,10 @@ public class SnapshotTask extends AbstractFuture<InetAddress> implements Runnabl
}
public boolean isLatencyForSnitch() { return false; }
+
+ public void onFailure(InetAddress from)
+ {
+ task.setException(new RuntimeException("Could not create snapshot at " + from));
+ }
}
}