You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2021/10/26 17:24:45 UTC

[cassandra] branch cassandra-4.0 updated (5aa2fb8 -> 530bc91)

This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a change to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 5aa2fb8  Merge branch 'cassandra-3.11' into cassandra-4.0
     new 7f54fe0  Fix failure handling in inter-node communication
     new c76a939  Merge branch 'cassandra-3.0' into cassandra-3.11
     new 530bc91  Merge branch 'cassandra-3.11' into cassandra-4.0

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                                      | 1 +
 src/java/org/apache/cassandra/net/InboundSink.java               | 8 ++++++--
 .../apache/cassandra/service/AbstractWriteResponseHandler.java   | 9 ++++++---
 3 files changed, 13 insertions(+), 5 deletions(-)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.11' into cassandra-4.0

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 530bc914cdf28c9c10eb53e3614b16cb9da0787b
Merge: 5aa2fb8 c76a939
Author: Andrés de la Peña <a....@gmail.com>
AuthorDate: Tue Oct 26 18:20:41 2021 +0100

    Merge branch 'cassandra-3.11' into cassandra-4.0

 CHANGES.txt                                                      | 1 +
 src/java/org/apache/cassandra/net/InboundSink.java               | 8 ++++++--
 .../apache/cassandra/service/AbstractWriteResponseHandler.java   | 9 ++++++---
 3 files changed, 13 insertions(+), 5 deletions(-)

diff --cc CHANGES.txt
index 5393f7e,0f8b5de..0ad423e
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,23 -1,17 +1,24 @@@
 -3.11.12
 - * Upgrade snakeyaml to 1.26 in 3.11 (CASSANDRA=17028)
 +4.0.2
 + * Push initial client connection messages to trace (CASSANDRA-17038)
 + * Correct the internode message timestamp if sending node has wrapped (CASSANDRA-16997)
 + * Avoid race causing us to return null in RangesAtEndpoint (CASSANDRA-16965)
 + * Avoid rewriting all sstables during cleanup when transient replication is enabled (CASSANDRA-16966)
 + * Prevent CQLSH from failure on Python 3.10 (CASSANDRA-16987)
 + * Avoid trying to acquire 0 permits from the rate limiter when taking snapshot (CASSANDRA-16872)
 + * Upgrade Caffeine to 2.5.6 (CASSANDRA-15153)
 + * Include SASI components to snapshots (CASSANDRA-15134)
 + * Fix missed wait latencies in the output of `nodetool tpstats -F` (CASSANDRA-16938)
 + * Remove all the state pollution between tests in SSTableReaderTest (CASSANDRA-16888)
 + * Delay auth setup until after gossip has settled to avoid unavailables on startup (CASSANDRA-16783)
 + * Fix clustering order logic in CREATE MATERIALIZED VIEW (CASSANDRA-16898)
 + * org.apache.cassandra.db.rows.ArrayCell#unsharedHeapSizeExcludingData includes data twice (CASSANDRA-16900)
 + * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies (CASSANDRA-16854)
 +Merged from 3.11:
   * Add key validation to ssstablescrub (CASSANDRA-16969)
   * Update Jackson from 2.9.10 to 2.12.5 (CASSANDRA-16851)
 - * Include SASI components to snapshots (CASSANDRA-15134)
   * Make assassinate more resilient to missing tokens (CASSANDRA-16847)
 - * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies (CASSANDRA-16854)
 - * Validate SASI tokenizer options before adding index to schema (CASSANDRA-15135)
 - * Fixup scrub output when no data post-scrub and clear up old use of row, which really means partition (CASSANDRA-16835)
 - * Fix ant-junit dependency issue (CASSANDRA-16827)
 - * Reduce thread contention in CommitLogSegment and HintsBuffer (CASSANDRA-16072)
 - * Avoid sending CDC column if not enabled (CASSANDRA-16770)
  Merged from 3.0:
+  * Fix failure handling in inter-node communication (CASSANDRA-16334)
   * Log more information when a node runs out of commitlog space (CASSANDRA-11323)
   * Don't take snapshots when truncating system tables (CASSANDRA-16839)
   * Make -Dtest.methods consistently optional in all Ant test targets (CASSANDRA-17014)
diff --cc src/java/org/apache/cassandra/net/InboundSink.java
index df63be2,0000000..16eb440
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/net/InboundSink.java
+++ b/src/java/org/apache/cassandra/net/InboundSink.java
@@@ -1,161 -1,0 +1,165 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.net;
 +
 +import java.io.IOException;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 +import java.util.function.Predicate;
 +
 +import org.slf4j.LoggerFactory;
 +
 +import net.openhft.chronicle.core.util.ThrowingConsumer;
 +import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
 +import org.apache.cassandra.exceptions.RequestFailureReason;
 +import org.apache.cassandra.index.IndexNotAvailableException;
++import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.utils.NoSpamLogger;
 +
 +/**
 + * A message sink that all inbound messages go through.
 + *
 + * Default sink used by {@link MessagingService} is {@link IVerbHandler#doVerb(Message)}, but it can be overridden
 + * to filter out certain messages, record the fact of attempted delivery, or delay arrival.
 + *
 + * This facility is most useful for test code.
 + *
 + * {@link #accept(Message)} is invoked on a thread belonging to the {@link org.apache.cassandra.concurrent.Stage}
 + * assigned to the {@link Verb} of the message.
 + */
 +public class InboundSink implements InboundMessageHandlers.MessageConsumer
 +{
 +    private static final NoSpamLogger noSpamLogger =
 +        NoSpamLogger.getLogger(LoggerFactory.getLogger(InboundSink.class), 1L, TimeUnit.SECONDS);
 +
 +    private static class Filtered implements ThrowingConsumer<Message<?>, IOException>
 +    {
 +        final Predicate<Message<?>> condition;
 +        final ThrowingConsumer<Message<?>, IOException> next;
 +
 +        private Filtered(Predicate<Message<?>> condition, ThrowingConsumer<Message<?>, IOException> next)
 +        {
 +            this.condition = condition;
 +            this.next = next;
 +        }
 +
 +        public void accept(Message<?> message) throws IOException
 +        {
 +            if (condition.test(message))
 +                next.accept(message);
 +        }
 +    }
 +
 +    @SuppressWarnings("FieldMayBeFinal")
 +    private volatile ThrowingConsumer<Message<?>, IOException> sink;
 +    private static final AtomicReferenceFieldUpdater<InboundSink, ThrowingConsumer> sinkUpdater
 +        = AtomicReferenceFieldUpdater.newUpdater(InboundSink.class, ThrowingConsumer.class, "sink");
 +
 +    private final MessagingService messaging;
 +
 +    InboundSink(MessagingService messaging)
 +    {
 +        this.messaging = messaging;
 +        this.sink = message -> message.header.verb.handler().doVerb((Message<Object>) message);
 +    }
 +
 +    public void fail(Message.Header header, Throwable failure)
 +    {
 +        if (header.callBackOnFailure())
 +        {
-             Message response = Message.failureResponse(header.id, header.expiresAtNanos, RequestFailureReason.forException(failure));
-             messaging.send(response, header.from);
++            InetAddressAndPort to = header.respondTo() != null ? header.respondTo() : header.from;
++            Message<RequestFailureReason> response = Message.failureResponse(header.id,
++                                                                             header.expiresAtNanos,
++                                                                             RequestFailureReason.forException(failure));
++            messaging.send(response, to);
 +        }
 +    }
 +
 +    public void accept(Message<?> message)
 +    {
 +        try
 +        {
 +            sink.accept(message);
 +        }
 +        catch (Throwable t)
 +        {
 +            fail(message.header, t);
 +
 +            if (t instanceof TombstoneOverwhelmingException || t instanceof IndexNotAvailableException)
 +                noSpamLogger.error(t.getMessage());
 +            else if (t instanceof RuntimeException)
 +                throw (RuntimeException) t;
 +            else
 +                throw new RuntimeException(t);
 +        }
 +    }
 +
 +    public void add(Predicate<Message<?>> allow)
 +    {
 +        sinkUpdater.updateAndGet(this, sink -> new Filtered(allow, sink));
 +    }
 +
 +    public void remove(Predicate<Message<?>> allow)
 +    {
 +        sinkUpdater.updateAndGet(this, sink -> without(sink, allow));
 +    }
 +
 +    public void clear()
 +    {
 +        sinkUpdater.updateAndGet(this, InboundSink::clear);
 +    }
 +
 +    @Deprecated // TODO: this is not the correct way to do things
 +    public boolean allow(Message<?> message)
 +    {
 +        return allows(sink, message);
 +    }
 +
 +    private static ThrowingConsumer<Message<?>, IOException> clear(ThrowingConsumer<Message<?>, IOException> sink)
 +    {
 +        while (sink instanceof Filtered)
 +            sink = ((Filtered) sink).next;
 +        return sink;
 +    }
 +
 +    private static ThrowingConsumer<Message<?>, IOException> without(ThrowingConsumer<Message<?>, IOException> sink, Predicate<Message<?>> condition)
 +    {
 +        if (!(sink instanceof Filtered))
 +            return sink;
 +
 +        Filtered filtered = (Filtered) sink;
 +        ThrowingConsumer<Message<?>, IOException> next = without(filtered.next, condition);
 +        return condition.equals(filtered.condition) ? next
 +                                                    : next == filtered.next
 +                                                      ? sink
 +                                                      : new Filtered(filtered.condition, next);
 +    }
 +
 +    private static boolean allows(ThrowingConsumer<Message<?>, IOException> sink, Message<?> message)
 +    {
 +        while (sink instanceof Filtered)
 +        {
 +            Filtered filtered = (Filtered) sink;
 +            if (!filtered.condition.test(message))
 +                return false;
 +            sink = filtered.next;
 +        }
 +        return true;
 +    }
 +
 +}
diff --cc src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 4f384a4,dc58701..7128277
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@@ -32,56 -30,40 +32,56 @@@ import org.slf4j.Logger
  import org.slf4j.LoggerFactory;
  
  import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.db.ConsistencyLevel;
 -import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.IMutation;
  import org.apache.cassandra.db.WriteType;
 -import org.apache.cassandra.exceptions.*;
 -import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 -import org.apache.cassandra.net.MessageIn;
 +import org.apache.cassandra.exceptions.RequestFailureReason;
 +import org.apache.cassandra.exceptions.WriteFailureException;
 +import org.apache.cassandra.exceptions.WriteTimeoutException;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.net.RequestCallback;
 +import org.apache.cassandra.net.Message;
 +import org.apache.cassandra.schema.Schema;
  import org.apache.cassandra.utils.concurrent.SimpleCondition;
  
 -public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackWithFailure<T>
 +import static java.util.concurrent.TimeUnit.NANOSECONDS;
- 
++import static org.apache.cassandra.locator.Replicas.countInOurDc;
 +
 +public abstract class AbstractWriteResponseHandler<T> implements RequestCallback<T>
  {
 -    protected static final Logger logger = LoggerFactory.getLogger( AbstractWriteResponseHandler.class );
 +    protected static final Logger logger = LoggerFactory.getLogger(AbstractWriteResponseHandler.class);
  
 +    //Count down until all responses and expirations have occured before deciding whether the ideal CL was reached.
 +    private AtomicInteger responsesAndExpirations;
      private final SimpleCondition condition = new SimpleCondition();
 -    protected final Keyspace keyspace;
 -    protected final Collection<InetAddress> naturalEndpoints;
 -    public final ConsistencyLevel consistencyLevel;
 +    protected final ReplicaPlan.ForTokenWrite replicaPlan;
 +
      protected final Runnable callback;
 -    protected final Collection<InetAddress> pendingEndpoints;
      protected final WriteType writeType;
      private static final AtomicIntegerFieldUpdater<AbstractWriteResponseHandler> failuresUpdater
 -        = AtomicIntegerFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, "failures");
 +    = AtomicIntegerFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, "failures");
      private volatile int failures = 0;
 -    private final Map<InetAddress, RequestFailureReason> failureReasonByEndpoint;
 +    private final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint;
      private final long queryStartNanoTime;
 -    private volatile boolean supportsBackPressure = true;
  
      /**
 -     * @param callback A callback to be called when the write is successful.
 +      * Delegate to another WriteResponseHandler or possibly this one to track if the ideal consistency level was reached.
 +      * Will be set to null if ideal CL was not configured
 +      * Will be set to an AWRH delegate if ideal CL was configured
 +      * Will be same as "this" if this AWRH is the ideal consistency level
 +      */
 +    private AbstractWriteResponseHandler idealCLDelegate;
 +
 +    /**
 +     * We don't want to increment the writeFailedIdealCL if we didn't achieve the original requested CL
 +     */
 +    private boolean requestedCLAchieved = false;
 +
 +    /**
 +     * @param callback           A callback to be called when the write is successful.
       * @param queryStartNanoTime
       */
 -    protected AbstractWriteResponseHandler(Keyspace keyspace,
 -                                           Collection<InetAddress> naturalEndpoints,
 -                                           Collection<InetAddress> pendingEndpoints,
 -                                           ConsistencyLevel consistencyLevel,
 +    protected AbstractWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan,
                                             Runnable callback,
                                             WriteType writeType,
                                             long queryStartNanoTime)
@@@ -202,18 -129,14 +202,21 @@@
      }
  
      /**
 -     * @return the total number of endpoints the request has been sent to.
 +     * TODO: this method is brittle for its purpose of deciding when we should fail a query;
-      *       this needs to be CL aware, and of which nodes are live/down
-      * @return the total number of endpoints the request can been sent to.
++     *       this needs to be aware of which nodes are live/down
++     * @return the total number of endpoints the request can send to.
       */
 -    protected int totalEndpoints()
 +    protected int candidateReplicaCount()
      {
 -        if (consistencyLevel != null && consistencyLevel.isDatacenterLocal())
 -            return consistencyLevel.countLocalEndpoints(Iterables.concat(naturalEndpoints, pendingEndpoints));
++        if (replicaPlan.consistencyLevel().isDatacenterLocal())
++            return countInOurDc(replicaPlan.liveAndDown()).allReplicas();
+ 
 -        return naturalEndpoints.size() + pendingEndpoints.size();
 +        return replicaPlan.liveAndDown().size();
 +    }
 +
 +    public ConsistencyLevel consistencyLevel()
 +    {
 +        return replicaPlan.consistencyLevel();
      }
  
      /**

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org