You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2022/07/05 23:54:00 UTC

[cassandra] 01/01: Merge branch 'cassandra-4.0' into cassandra-4.1

This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 15bdf2e8e7e7a66dd85bf3f86cc58576a13657be
Merge: 91d08e496e 924cd8f52c
Author: Caleb Rackliffe <ca...@gmail.com>
AuthorDate: Tue Jul 5 18:40:36 2022 -0500

    Merge branch 'cassandra-4.0' into cassandra-4.1

 CHANGES.txt                                              |  1 +
 .../org/apache/cassandra/streaming/StreamSession.java    | 11 +++++++----
 .../apache/cassandra/streaming/StreamTransferTask.java   | 16 ++++++++--------
 .../org/apache/cassandra/distributed/impl/Instance.java  |  1 -
 4 files changed, 16 insertions(+), 13 deletions(-)

diff --cc CHANGES.txt
index 6cf38e97e1,75999f160c..f9f3c28227
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,14 -1,5 +1,15 @@@
 -4.0.5
 - * Make sure existing delayed tasks in StreamTransferTask cannot prevent clean shutdown (CASSANDRA-17706)
 +4.1-alpha2
++ * Relax synchronization on StreamSession#onError() to avoid deadlock (CASSANDRA-17706)
 + * Fix AbstractCell#toString throws MarshalException for cell in collection (CASSANDRA-17695)
 + * Add new vtable output option to compactionstats (CASSANDRA-17683)
 + * Fix commitLogUpperBound initialization in AbstractMemtableWithCommitlog (CASSANDRA-17587)
 + * Fix widening to long in getBatchSizeFailThreshold (CASSANDRA-17650)
 + * Fix widening from mebibytes to bytes in IntMebibytesBound (CASSANDRA-17716)
 + * Revert breaking change in nodetool clientstats and expose cient options through nodetool clientstats --client-options. (CASSANDRA-17715)
 + * Fix missed nowInSec values in QueryProcessor (CASSANDRA-17458)
 + * Revert removal of withBufferSizeInMB(int size) in CQLSSTableWriter.Builder class and deprecate it in favor of withBufferSizeInMiB(int size) (CASSANDRA-17675)
 + * Remove expired snapshots of dropped tables after restart (CASSANDRA-17619)
 +Merged from 4.0:
   * SSL storage port in sstableloader is deprecated (CASSANDRA-17602)
   * Fix counter write timeouts at ONE (CASSANDRA-17411)
   * Fix NPE in getLocalPrimaryRangeForEndpoint (CASSANDRA-17680)
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index 2036262e54,a2e479c524..2da7021968
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -644,12 -629,11 +644,12 @@@ public class StreamSession implements I
       * after completion or because the peer was down, otherwise sends a {@link SessionFailedMessage} and closes
       * the session as {@link State#FAILED}.
       */
--    public synchronized Future<?> onError(Throwable e)
++    public Future<?> onError(Throwable e)
      {
 -        boolean isEofException = e instanceof EOFException;
 +        boolean isEofException = e instanceof EOFException || e instanceof ClosedChannelException;
          if (isEofException)
          {
 +            State state = this.state;
              if (state.finalState)
              {
                  logger.debug("[Stream #{}] Socket closed after session completed with state {}", planId(), state);
@@@ -668,10 -652,13 +668,13 @@@
          }
  
          logError(e);
-         // send session failure message
+ 
 -        if (messageSender.connected())
 +        if (channel.connected())
-             channel.sendControlMessage(new SessionFailedMessage()).syncUninterruptibly();
-         // fail session
+         {
 -            state(State.FAILED); // make sure subsequent error handling sees the session in a final state
 -            messageSender.sendMessage(new SessionFailedMessage());
++            state(State.FAILED); // make sure subsequent error handling sees the session in a final state 
++            channel.sendControlMessage(new SessionFailedMessage()).awaitUninterruptibly();
+         }
+ 
          return closeSession(State.FAILED);
      }
  
diff --cc src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index 2e4d8bdc44,1fbd5402fb..45fbcc6b07
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@@ -45,7 -46,7 +45,7 @@@ import static org.apache.cassandra.conc
  public class StreamTransferTask extends StreamTask
  {
      private static final Logger logger = LoggerFactory.getLogger(StreamTransferTask.class);
-     private static final ScheduledExecutorPlus timeoutExecutor = executorFactory().scheduled("StreamingTransferTaskTimeouts");
 -    private static final ScheduledThreadPoolExecutor timeoutExecutor = createTimeoutExecutor();
++    private static final ScheduledExecutorPlus timeoutExecutor = executorFactory().scheduled(false, "StreamingTransferTaskTimeouts");
  
      private final AtomicInteger sequenceNumber = new AtomicInteger(0);
      private boolean aborted = false;
@@@ -188,8 -169,20 +188,8 @@@
          if (!streams.containsKey(sequenceNumber))
              return null;
  
-         ScheduledFuture future = timeoutExecutor.scheduleTimeoutWithDelay(() -> StreamTransferTask.this.timeout(sequenceNumber), time, unit);
-         ScheduledFuture prev = timeoutTasks.put(sequenceNumber, future);
 -        ScheduledFuture<?> future = timeoutExecutor.schedule(new Runnable()
 -        {
 -            public void run()
 -            {
 -                synchronized (StreamTransferTask.this)
 -                {
 -                    // remove so we don't cancel ourselves
 -                    timeoutTasks.remove(sequenceNumber);
 -                    StreamTransferTask.this.complete(sequenceNumber);
 -                }
 -            }
 -        }, time, unit);
 -
++        ScheduledFuture<?> future = timeoutExecutor.scheduleTimeoutWithDelay(() -> StreamTransferTask.this.timeout(sequenceNumber), time, unit);
+         ScheduledFuture<?> prev = timeoutTasks.put(sequenceNumber, future);
          assert prev == null;
          return future;
      }
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index a9cede1fb8,97dd45ec77..a3adfb7e8b
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@@ -32,8 -32,8 +32,7 @@@ import java.util.List
  import java.util.Map;
  import java.util.Objects;
  import java.util.UUID;
 -import java.util.concurrent.CompletableFuture;
  import java.util.concurrent.CopyOnWriteArrayList;
- import java.util.concurrent.ExecutionException;
  import java.util.concurrent.ExecutorService;
  import java.util.concurrent.Future;
  import java.util.concurrent.TimeUnit;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org