You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2021/10/26 17:24:42 UTC
[cassandra] branch cassandra-3.0 updated: Fix failure handling in
inter-node communication
This is an automated email from the ASF dual-hosted git repository.
adelapena pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
new 7f54fe0 Fix failure handling in inter-node communication
7f54fe0 is described below
commit 7f54fe02298b90e6152acc026384c033a96ce621
Author: Aleksandr Sorokoumov <al...@gmail.com>
AuthorDate: Tue Oct 26 18:14:35 2021 +0100
Fix failure handling in inter-node communication
patch by Aleksandr Sorokoumov; reviewed by Andrés de la Peña and Paulo Motta for CASSANDRA-16334
---
CHANGES.txt | 1 +
.../apache/cassandra/db/MutationVerbHandler.java | 3 ++-
.../apache/cassandra/net/MessageDeliveryTask.java | 22 ++++++++++++++++++----
.../service/AbstractWriteResponseHandler.java | 3 +++
.../org/apache/cassandra/service/StorageProxy.java | 2 +-
5 files changed, 25 insertions(+), 6 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index c79746f..7dd58ff 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.26:
+ * Fix failure handling in inter-node communication (CASSANDRA-16334)
* Log more information when a node runs out of commitlog space (CASSANDRA-11323)
* Don't take snapshots when truncating system tables (CASSANDRA-16839)
* Make -Dtest.methods consistently optional in all Ant test targets (CASSANDRA-17014)
diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
index 5888438..10b77f2 100644
--- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
@@ -87,7 +87,8 @@ public class MutationVerbHandler implements IVerbHandler<Mutation>
int size = in.readInt();
// tell the recipients who to send their ack to
- MessageOut<Mutation> message = new MessageOut<>(verb, mutation, Mutation.serializer).withParameter(Mutation.FORWARD_FROM, from.getAddress());
+ MessageOut<Mutation> message = new MessageOut<>(verb, mutation, Mutation.serializer).withParameter(Mutation.FORWARD_FROM, from.getAddress())
+ .withParameter(MessagingService.FAILURE_CALLBACK_PARAM, MessagingService.ONE_BYTE);
// Send a message to each of the addresses on our Forward List
for (int i = 0; i < size; i++)
{
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index ce6eebc..26e780f 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -18,11 +18,14 @@
package org.apache.cassandra.net;
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.EnumSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.index.IndexNotAvailableException;
@@ -31,12 +34,12 @@ public class MessageDeliveryTask implements Runnable
{
private static final Logger logger = LoggerFactory.getLogger(MessageDeliveryTask.class);
- private final MessageIn message;
+ private final MessageIn<?> message;
private final int id;
private final long constructionTime;
private final boolean isCrossNodeTimestamp;
- public MessageDeliveryTask(MessageIn message, int id, long timestamp, boolean isCrossNodeTimestamp)
+ public MessageDeliveryTask(MessageIn<?> message, int id, long timestamp, boolean isCrossNodeTimestamp)
{
assert message != null;
this.message = message;
@@ -90,9 +93,20 @@ public class MessageDeliveryTask implements Runnable
{
if (message.doCallbackOnFailure())
{
- MessageOut response = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE)
+ MessageOut<?> response = new MessageOut<>(MessagingService.Verb.INTERNAL_RESPONSE)
.withParameter(MessagingService.FAILURE_RESPONSE_PARAM, MessagingService.ONE_BYTE);
- MessagingService.instance().sendReply(response, id, message.from);
+
+ InetAddress from;
+ byte[] fromBytes = message.parameters.get(Mutation.FORWARD_FROM);
+ try
+ {
+ from = fromBytes != null ? InetAddress.getByAddress(fromBytes) : message.from;
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ MessagingService.instance().sendReply(response, id, from);
}
}
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index e3ba66e..bf099e4 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -122,6 +122,9 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
*/
protected int totalEndpoints()
{
+ if (consistencyLevel != null && consistencyLevel.isDatacenterLocal())
+ return consistencyLevel.countLocalEndpoints(Iterables.concat(naturalEndpoints, pendingEndpoints));
+
return naturalEndpoints.size() + pendingEndpoints.size();
}
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index a6d35f4..89f93f6 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1445,7 +1445,7 @@ public class StorageProxy implements StorageProxyMBean
catch (Exception ex)
{
if (!(ex instanceof WriteTimeoutException))
- logger.error("Failed to apply mutation locally : {}", ex);
+ logger.error("Failed to apply mutation locally : ", ex);
handler.onFailure(FBUtilities.getBroadcastAddress());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org