You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/04/09 04:37:57 UTC
[2/3] git commit: Add failure handler to async callback
Add failure handler to async callback
patch by sankalp kohli; reviewed by yukim for CASSANDRA-6747
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8a5b90ed
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8a5b90ed
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8a5b90ed
Branch: refs/heads/trunk
Commit: 8a5b90ede0701e05f776e6adccc71f6fbd43e3aa
Parents: 66b3b2b
Author: sankalp kohli <oh...@gmail.com>
Authored: Tue Apr 8 21:37:00 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Apr 8 21:37:00 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/net/CallbackInfo.java | 15 ++++++++-
.../net/IAsyncCallbackWithFailure.java | 29 +++++++++++++++++
.../cassandra/net/MessageDeliveryTask.java | 16 +++++++++-
.../org/apache/cassandra/net/MessageIn.java | 10 ++++++
.../apache/cassandra/net/MessagingService.java | 33 +++++++++++++++-----
.../cassandra/net/ResponseVerbHandler.java | 12 +++++--
.../apache/cassandra/repair/SnapshotTask.java | 21 ++++++++-----
.../cassandra/service/ActiveRepairService.java | 23 +++++++++++---
.../cassandra/service/StorageService.java | 12 ++++++-
10 files changed, 146 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a5b90ed/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f1e8b20..94dd7c3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -42,6 +42,7 @@
* Lock counter cells, not partitions (CASSANDRA-6880)
* Track presence of legacy counter shards in sstables (CASSANDRA-6888)
* Ensure safe resource cleanup when replacing sstables (CASSANDRA-6912)
+ * Add failure handler to async callback (CASSANDRA-6747)
Merged from 2.0:
* Put nodes in hibernate when join_ring is false (CASSANDRA-6961)
* Allow compaction of system tables during startup (CASSANDRA-6913)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a5b90ed/src/java/org/apache/cassandra/net/CallbackInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/CallbackInfo.java b/src/java/org/apache/cassandra/net/CallbackInfo.java
index 3e584b4..b61210c 100644
--- a/src/java/org/apache/cassandra/net/CallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/CallbackInfo.java
@@ -31,6 +31,12 @@ public class CallbackInfo
protected final InetAddress target;
protected final IAsyncCallback callback;
protected final IVersionedSerializer<?> serializer;
+ private final boolean failureCallback;
+
+ public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?> serializer)
+ {
+ this(target, callback, serializer, false);
+ }
/**
* Create CallbackInfo without sent message
@@ -39,11 +45,12 @@ public class CallbackInfo
* @param callback
* @param serializer serializer to deserialize response message
*/
- public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?> serializer)
+ public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?> serializer, boolean failureCallback)
{
this.target = target;
this.callback = callback;
this.serializer = serializer;
+ this.failureCallback = failureCallback;
}
public boolean shouldHint()
@@ -51,12 +58,18 @@ public class CallbackInfo
return false;
}
+ public boolean isFailureCallback()
+ {
+ return failureCallback;
+ }
+
public String toString()
{
return "CallbackInfo(" +
"target=" + target +
", callback=" + callback +
", serializer=" + serializer +
+ ", failureCallback=" + failureCallback +
')';
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a5b90ed/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java b/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java
new file mode 100644
index 0000000..744bb62
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+
+public interface IAsyncCallbackWithFailure<T> extends IAsyncCallback<T>
+{
+
+ /**
+ * Called when there is an exception on the remote node or timeout happens
+ */
+ public void onFailure(InetAddress from);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a5b90ed/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index e49b93c..387817b 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -57,7 +57,21 @@ public class MessageDeliveryTask implements Runnable
return;
}
- verbHandler.doVerb(message, id);
+ try
+ {
+ verbHandler.doVerb(message, id);
+ }
+ catch (Throwable t)
+ {
+ if (message.isFailureCallback())
+ {
+ MessageOut response = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE)
+ .withParameter(MessagingService.FAILURE_RESPONSE_PARAM, MessagingService.ONE_BYTE);
+ MessagingService.instance().sendReply(response, id, message.from);
+ }
+
+ throw t;
+ }
if (GOSSIP_VERBS.contains(message.verb))
Gossiper.instance.setLastProcessedMessageAt(constructionTime);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a5b90ed/src/java/org/apache/cassandra/net/MessageIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java
index e0efefe..b37ca2b 100644
--- a/src/java/org/apache/cassandra/net/MessageIn.java
+++ b/src/java/org/apache/cassandra/net/MessageIn.java
@@ -105,6 +105,16 @@ public class MessageIn<T>
return MessagingService.verbStages.get(verb);
}
+ public boolean isFailureCallback()
+ {
+ return parameters.containsKey(MessagingService.FAILURE_CALLBACK_PARAM);
+ }
+
+ public boolean isFailureResponse()
+ {
+ return parameters.containsKey(MessagingService.FAILURE_RESPONSE_PARAM);
+ }
+
public long getTimeout()
{
return DatabaseDescriptor.getTimeout(verb);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a5b90ed/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index ab12020..6d9a1b5 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -75,6 +75,10 @@ public final class MessagingService implements MessagingServiceMBean
public static final int VERSION_21 = 8;
public static final int current_version = VERSION_21;
+ public static final String FAILURE_CALLBACK_PARAM = "CAL_BAC";
+ public static final byte[] ONE_BYTE = new byte[1];
+ public static final String FAILURE_RESPONSE_PARAM = "FAIL";
+
/**
* we preface every message with this number so the recipient can validate the sender is sane
*/
@@ -166,7 +170,6 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.MIGRATION_REQUEST, Stage.MIGRATION);
put(Verb.INDEX_SCAN, Stage.READ);
put(Verb.REPLICATION_FINISHED, Stage.MISC);
- put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE);
put(Verb.COUNTER_MUTATION, Stage.MUTATION);
put(Verb.SNAPSHOT, Stage.MISC);
put(Verb.ECHO, Stage.GOSSIP);
@@ -329,10 +332,19 @@ public final class MessagingService implements MessagingServiceMBean
{
public Object apply(Pair<Integer, ExpiringMap.CacheableObject<CallbackInfo>> pair)
{
- CallbackInfo expiredCallbackInfo = pair.right.value;
+ final CallbackInfo expiredCallbackInfo = pair.right.value;
maybeAddLatency(expiredCallbackInfo.callback, expiredCallbackInfo.target, pair.right.timeout);
ConnectionMetrics.totalTimeouts.mark();
getConnectionPool(expiredCallbackInfo.target).incrementTimeout();
+ if (expiredCallbackInfo.isFailureCallback())
+ {
+ StageManager.getStage(Stage.INTERNAL_RESPONSE).submit(new Runnable() {
+ @Override
+ public void run() {
+ ((IAsyncCallbackWithFailure)expiredCallbackInfo.callback).onFailure(expiredCallbackInfo.target);
+ }
+ });
+ }
if (expiredCallbackInfo.shouldHint())
{
@@ -537,11 +549,11 @@ public final class MessagingService implements MessagingServiceMBean
return verbHandlers.get(type);
}
- public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress to, long timeout)
+ public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress to, long timeout, boolean failureCallback)
{
assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel
int messageId = nextId();
- CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
+ CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb), failureCallback), timeout);
assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
return messageId;
}
@@ -564,7 +576,12 @@ public final class MessagingService implements MessagingServiceMBean
public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb)
{
- return sendRR(message, to, cb, message.getTimeout());
+ return sendRR(message, to, cb, message.getTimeout(), false);
+ }
+
+ public int sendRRWithFailure(MessageOut message, InetAddress to, IAsyncCallbackWithFailure cb)
+ {
+ return sendRR(message, to, cb, message.getTimeout(), true);
}
/**
@@ -581,10 +598,10 @@ public final class MessagingService implements MessagingServiceMBean
* @param timeout the timeout used for expiration
* @return an reference to message id used to match with the result
*/
- public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, long timeout)
+ public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, long timeout, boolean failureCallback)
{
- int id = addCallback(cb, message, to, timeout);
- sendOneWay(message, id, to);
+ int id = addCallback(cb, message, to, timeout, failureCallback);
+ sendOneWay(failureCallback ? message.withParameter(FAILURE_CALLBACK_PARAM, ONE_BYTE) : message, id, to);
return id;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a5b90ed/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index 132e574..1d9aa98 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -42,7 +42,15 @@ public class ResponseVerbHandler implements IVerbHandler
Tracing.trace("Processing response from {}", message.from);
IAsyncCallback cb = callbackInfo.callback;
- MessagingService.instance().maybeAddLatency(cb, message.from, latency);
- cb.response(message);
+ if (message.isFailureResponse())
+ {
+ ((IAsyncCallbackWithFailure) cb).onFailure(message.from);
+ }
+ else
+ {
+ //TODO: Should we add latency only in success cases?
+ MessagingService.instance().maybeAddLatency(cb, message.from, latency);
+ cb.response(message);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a5b90ed/src/java/org/apache/cassandra/repair/SnapshotTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SnapshotTask.java b/src/java/org/apache/cassandra/repair/SnapshotTask.java
index 1a9d324..cb5003a 100644
--- a/src/java/org/apache/cassandra/repair/SnapshotTask.java
+++ b/src/java/org/apache/cassandra/repair/SnapshotTask.java
@@ -24,7 +24,7 @@ import java.util.concurrent.RunnableFuture;
import com.google.common.util.concurrent.AbstractFuture;
import org.apache.cassandra.db.SnapshotCommand;
-import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
@@ -44,18 +44,18 @@ public class SnapshotTask extends AbstractFuture<InetAddress> implements Runnabl
public void run()
{
- MessagingService.instance().sendRR(new SnapshotCommand(desc.keyspace,
- desc.columnFamily,
- desc.sessionId.toString(),
- false).createMessage(),
- endpoint,
- new SnapshotCallback(this));
+ MessagingService.instance().sendRRWithFailure(new SnapshotCommand(desc.keyspace,
+ desc.columnFamily,
+ desc.sessionId.toString(),
+ false).createMessage(),
+ endpoint,
+ new SnapshotCallback(this));
}
/**
* Callback for snapshot request. Run on INTERNAL_RESPONSE stage.
*/
- static class SnapshotCallback implements IAsyncCallback
+ static class SnapshotCallback implements IAsyncCallbackWithFailure
{
final SnapshotTask task;
@@ -75,5 +75,10 @@ public class SnapshotTask extends AbstractFuture<InetAddress> implements Runnabl
}
public boolean isLatencyForSnitch() { return false; }
+
+ public void onFailure(InetAddress from)
+ {
+ task.setException(new RuntimeException("Could not create snapshot at " + from));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a5b90ed/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 83d8727..7a7aac9 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -23,6 +23,7 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
@@ -43,7 +44,7 @@ import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
@@ -249,19 +250,24 @@ public class ActiveRepairService
UUID parentRepairSession = UUIDGen.getTimeUUID();
registerParentRepairSession(parentRepairSession, columnFamilyStores, ranges);
final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size());
- IAsyncCallback callback = new IAsyncCallback()
+ final AtomicBoolean status = new AtomicBoolean(true);
+ IAsyncCallbackWithFailure callback = new IAsyncCallbackWithFailure()
{
- @Override
public void response(MessageIn msg)
{
prepareLatch.countDown();
}
- @Override
public boolean isLatencyForSnitch()
{
return false;
}
+
+ public void onFailure(InetAddress from)
+ {
+ status.set(false);
+ prepareLatch.countDown();
+ }
};
List<UUID> cfIds = new ArrayList<>(columnFamilyStores.size());
@@ -272,7 +278,7 @@ public class ActiveRepairService
{
PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, ranges);
MessageOut<RepairMessage> msg = message.createMessage();
- MessagingService.instance().sendRR(msg, neighbour, callback);
+ MessagingService.instance().sendRRWithFailure(msg, neighbour, callback);
}
try
{
@@ -283,6 +289,13 @@ public class ActiveRepairService
parentRepairSessions.remove(parentRepairSession);
throw new RuntimeException("Did not get replies from all endpoints.", e);
}
+
+ if (!status.get())
+ {
+ parentRepairSessions.remove(parentRepairSession);
+ throw new RuntimeException("Did not get positive replies from all endpoints.");
+ }
+
return parentRepairSession;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a5b90ed/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 9f8158f..3e94172 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2642,7 +2642,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
UUID parentSession = null;
if (!fullRepair)
- parentSession = ActiveRepairService.instance.prepareForRepair(allNeighbors, ranges, columnFamilyStores);
+ {
+ try
+ {
+ parentSession = ActiveRepairService.instance.prepareForRepair(allNeighbors, ranges, columnFamilyStores);
+ }
+ catch (Throwable t)
+ {
+ sendNotification("repair", String.format("Repair failed with error %s", t.getMessage()), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
+ return;
+ }
+ }
List<RepairFuture> futures = new ArrayList<>(ranges.size());
for (Range<Token> range : ranges)