You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2022/07/05 23:54:00 UTC
[cassandra] 01/01: Merge branch 'cassandra-4.0' into cassandra-4.1
This is an automated email from the ASF dual-hosted git repository.
maedhroz pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 15bdf2e8e7e7a66dd85bf3f86cc58576a13657be
Merge: 91d08e496e 924cd8f52c
Author: Caleb Rackliffe <ca...@gmail.com>
AuthorDate: Tue Jul 5 18:40:36 2022 -0500
Merge branch 'cassandra-4.0' into cassandra-4.1
CHANGES.txt | 1 +
.../org/apache/cassandra/streaming/StreamSession.java | 11 +++++++----
.../apache/cassandra/streaming/StreamTransferTask.java | 16 ++++++++--------
.../org/apache/cassandra/distributed/impl/Instance.java | 1 -
4 files changed, 16 insertions(+), 13 deletions(-)
diff --cc CHANGES.txt
index 6cf38e97e1,75999f160c..f9f3c28227
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,14 -1,5 +1,15 @@@
-4.0.5
- * Make sure existing delayed tasks in StreamTransferTask cannot prevent clean shutdown (CASSANDRA-17706)
+4.1-alpha2
++ * Relax synchronization on StreamSession#onError() to avoid deadlock (CASSANDRA-17706)
+ * Fix AbstractCell#toString throws MarshalException for cell in collection (CASSANDRA-17695)
+ * Add new vtable output option to compactionstats (CASSANDRA-17683)
+ * Fix commitLogUpperBound initialization in AbstractMemtableWithCommitlog (CASSANDRA-17587)
+ * Fix widening to long in getBatchSizeFailThreshold (CASSANDRA-17650)
+ * Fix widening from mebibytes to bytes in IntMebibytesBound (CASSANDRA-17716)
+ * Revert breaking change in nodetool clientstats and expose cient options through nodetool clientstats --client-options. (CASSANDRA-17715)
+ * Fix missed nowInSec values in QueryProcessor (CASSANDRA-17458)
+ * Revert removal of withBufferSizeInMB(int size) in CQLSSTableWriter.Builder class and deprecate it in favor of withBufferSizeInMiB(int size) (CASSANDRA-17675)
+ * Remove expired snapshots of dropped tables after restart (CASSANDRA-17619)
+Merged from 4.0:
* SSL storage port in sstableloader is deprecated (CASSANDRA-17602)
* Fix counter write timeouts at ONE (CASSANDRA-17411)
* Fix NPE in getLocalPrimaryRangeForEndpoint (CASSANDRA-17680)
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index 2036262e54,a2e479c524..2da7021968
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -644,12 -629,11 +644,12 @@@ public class StreamSession implements I
* after completion or because the peer was down, otherwise sends a {@link SessionFailedMessage} and closes
* the session as {@link State#FAILED}.
*/
-- public synchronized Future<?> onError(Throwable e)
++ public Future<?> onError(Throwable e)
{
- boolean isEofException = e instanceof EOFException;
+ boolean isEofException = e instanceof EOFException || e instanceof ClosedChannelException;
if (isEofException)
{
+ State state = this.state;
if (state.finalState)
{
logger.debug("[Stream #{}] Socket closed after session completed with state {}", planId(), state);
@@@ -668,10 -652,13 +668,13 @@@
}
logError(e);
- // send session failure message
+
- if (messageSender.connected())
+ if (channel.connected())
- channel.sendControlMessage(new SessionFailedMessage()).syncUninterruptibly();
- // fail session
+ {
- state(State.FAILED); // make sure subsequent error handling sees the session in a final state
- messageSender.sendMessage(new SessionFailedMessage());
++ state(State.FAILED); // make sure subsequent error handling sees the session in a final state
++ channel.sendControlMessage(new SessionFailedMessage()).awaitUninterruptibly();
+ }
+
return closeSession(State.FAILED);
}
diff --cc src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index 2e4d8bdc44,1fbd5402fb..45fbcc6b07
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@@ -45,7 -46,7 +45,7 @@@ import static org.apache.cassandra.conc
public class StreamTransferTask extends StreamTask
{
private static final Logger logger = LoggerFactory.getLogger(StreamTransferTask.class);
- private static final ScheduledExecutorPlus timeoutExecutor = executorFactory().scheduled("StreamingTransferTaskTimeouts");
- private static final ScheduledThreadPoolExecutor timeoutExecutor = createTimeoutExecutor();
++ private static final ScheduledExecutorPlus timeoutExecutor = executorFactory().scheduled(false, "StreamingTransferTaskTimeouts");
private final AtomicInteger sequenceNumber = new AtomicInteger(0);
private boolean aborted = false;
@@@ -188,8 -169,20 +188,8 @@@
if (!streams.containsKey(sequenceNumber))
return null;
- ScheduledFuture future = timeoutExecutor.scheduleTimeoutWithDelay(() -> StreamTransferTask.this.timeout(sequenceNumber), time, unit);
- ScheduledFuture prev = timeoutTasks.put(sequenceNumber, future);
- ScheduledFuture<?> future = timeoutExecutor.schedule(new Runnable()
- {
- public void run()
- {
- synchronized (StreamTransferTask.this)
- {
- // remove so we don't cancel ourselves
- timeoutTasks.remove(sequenceNumber);
- StreamTransferTask.this.complete(sequenceNumber);
- }
- }
- }, time, unit);
-
++ ScheduledFuture<?> future = timeoutExecutor.scheduleTimeoutWithDelay(() -> StreamTransferTask.this.timeout(sequenceNumber), time, unit);
+ ScheduledFuture<?> prev = timeoutTasks.put(sequenceNumber, future);
assert prev == null;
return future;
}
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index a9cede1fb8,97dd45ec77..a3adfb7e8b
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@@ -32,8 -32,8 +32,7 @@@ import java.util.List
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
- import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org