You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2021/10/26 17:24:42 UTC

[cassandra] branch cassandra-3.0 updated: Fix failure handling in inter-node communication

This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new 7f54fe0  Fix failure handling in inter-node communication
7f54fe0 is described below

commit 7f54fe02298b90e6152acc026384c033a96ce621
Author: Aleksandr Sorokoumov <al...@gmail.com>
AuthorDate: Tue Oct 26 18:14:35 2021 +0100

    Fix failure handling in inter-node communication
    
    patch by Aleksandr Sorokoumov; reviewed by Andrés de la Peña and Paulo Motta for CASSANDRA-16334
---
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/db/MutationVerbHandler.java   |  3 ++-
 .../apache/cassandra/net/MessageDeliveryTask.java  | 22 ++++++++++++++++++----
 .../service/AbstractWriteResponseHandler.java      |  3 +++
 .../org/apache/cassandra/service/StorageProxy.java |  2 +-
 5 files changed, 25 insertions(+), 6 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index c79746f..7dd58ff 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.26:
+ * Fix failure handling in inter-node communication (CASSANDRA-16334)
  * Log more information when a node runs out of commitlog space (CASSANDRA-11323)
  * Don't take snapshots when truncating system tables (CASSANDRA-16839)
  * Make -Dtest.methods consistently optional in all Ant test targets (CASSANDRA-17014)
diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
index 5888438..10b77f2 100644
--- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
@@ -87,7 +87,8 @@ public class MutationVerbHandler implements IVerbHandler<Mutation>
             int size = in.readInt();
 
             // tell the recipients who to send their ack to
-            MessageOut<Mutation> message = new MessageOut<>(verb, mutation, Mutation.serializer).withParameter(Mutation.FORWARD_FROM, from.getAddress());
+            MessageOut<Mutation> message = new MessageOut<>(verb, mutation, Mutation.serializer).withParameter(Mutation.FORWARD_FROM, from.getAddress())
+                                                                                                .withParameter(MessagingService.FAILURE_CALLBACK_PARAM, MessagingService.ONE_BYTE);
             // Send a message to each of the addresses on our Forward List
             for (int i = 0; i < size; i++)
             {
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index ce6eebc..26e780f 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -18,11 +18,14 @@
 package org.apache.cassandra.net;
 
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.EnumSet;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.index.IndexNotAvailableException;
@@ -31,12 +34,12 @@ public class MessageDeliveryTask implements Runnable
 {
     private static final Logger logger = LoggerFactory.getLogger(MessageDeliveryTask.class);
 
-    private final MessageIn message;
+    private final MessageIn<?> message;
     private final int id;
     private final long constructionTime;
     private final boolean isCrossNodeTimestamp;
 
-    public MessageDeliveryTask(MessageIn message, int id, long timestamp, boolean isCrossNodeTimestamp)
+    public MessageDeliveryTask(MessageIn<?> message, int id, long timestamp, boolean isCrossNodeTimestamp)
     {
         assert message != null;
         this.message = message;
@@ -90,9 +93,20 @@ public class MessageDeliveryTask implements Runnable
     {
         if (message.doCallbackOnFailure())
         {
-            MessageOut response = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE)
+            MessageOut<?> response = new MessageOut<>(MessagingService.Verb.INTERNAL_RESPONSE)
                                                 .withParameter(MessagingService.FAILURE_RESPONSE_PARAM, MessagingService.ONE_BYTE);
-            MessagingService.instance().sendReply(response, id, message.from);
+
+            InetAddress from;
+            byte[] fromBytes = message.parameters.get(Mutation.FORWARD_FROM);
+            try
+            {
+                from = fromBytes != null ? InetAddress.getByAddress(fromBytes) : message.from;
+            }
+            catch (UnknownHostException e)
+            {
+                throw new RuntimeException(e);
+            }
+            MessagingService.instance().sendReply(response, id, from);
         }
     }
 
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index e3ba66e..bf099e4 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -122,6 +122,9 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
      */
     protected int totalEndpoints()
     {
+        if (consistencyLevel != null && consistencyLevel.isDatacenterLocal())
+            return consistencyLevel.countLocalEndpoints(Iterables.concat(naturalEndpoints, pendingEndpoints));
+
         return naturalEndpoints.size() + pendingEndpoints.size();
     }
 
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index a6d35f4..89f93f6 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1445,7 +1445,7 @@ public class StorageProxy implements StorageProxyMBean
                 catch (Exception ex)
                 {
                     if (!(ex instanceof WriteTimeoutException))
-                        logger.error("Failed to apply mutation locally : {}", ex);
+                        logger.error("Failed to apply mutation locally : ", ex);
                     handler.onFailure(FBUtilities.getBroadcastAddress());
                 }
             }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org