You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2019/11/20 14:59:09 UTC
[cassandra] branch trunk updated: Close channels on error
This is an automated email from the ASF dual-hosted git repository.
snazy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
View the commit online:
https://github.com/apache/cassandra/commit/c2e11bd4224b2110abe6aa84c8882e85980e3491
The following commit(s) were added to refs/heads/trunk by this push:
new c2e11bd Close channels on error
c2e11bd is described below
commit c2e11bd4224b2110abe6aa84c8882e85980e3491
Author: Ekaterina Dimitrova <ek...@datastax.com>
AuthorDate: Fri Nov 8 15:15:20 2019 -0500
Close channels on error
Patch by Ekaterina Dimitrova, reviewed by Robert Stupp for CASSANDRA-15407
---
CHANGES.txt | 1 +
.../cassandra/hints/ChecksummedDataInput.java | 11 +++++-
.../hints/CompressedChecksummedDataInput.java | 11 +++++-
.../hints/EncryptedChecksummedDataInput.java | 11 +++++-
.../org/apache/cassandra/hints/HintsWriter.java | 12 +++---
.../org/apache/cassandra/io/util/FileHandle.java | 9 +++++
.../org/apache/cassandra/utils/Throwables.java | 45 ++++++++++++++++++++++
7 files changed, 91 insertions(+), 9 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index da57886..d44306c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,6 +3,7 @@
* Optimise native protocol ASCII string encoding (CASSANDRA-15410)
* Make sure all exceptions are propagated in DebuggableThreadPoolExecutor (CASSANDRA-15332)
* Make it possible to resize concurrent read / write thread pools at runtime (CASSANDRA-15277)
+ * Close channels on error (CASSANDRA-15407)
Merged from 2.2:
* In-JVM DTest: Set correct internode message version for upgrade test (CASSANDRA-15371)
diff --git a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
index 6ebc830..30d18fa 100644
--- a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
@@ -26,6 +26,7 @@ import com.google.common.base.Preconditions;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.NativeLibrary;
import org.apache.cassandra.utils.memory.BufferPool;
@@ -74,7 +75,15 @@ public class ChecksummedDataInput extends RebufferingInputStream
@SuppressWarnings("resource")
public static ChecksummedDataInput open(File file)
{
- return new ChecksummedDataInput(new ChannelProxy(file));
+ ChannelProxy channel = new ChannelProxy(file);
+ try
+ {
+ return new ChecksummedDataInput(channel);
+ }
+ catch (Throwable t)
+ {
+ throw Throwables.cleaned(channel.close(t));
+ }
}
public boolean isEOF()
diff --git a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
index 4982a03..0381b00 100644
--- a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.compress.ICompressor;
import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.Throwables;
public final class CompressedChecksummedDataInput extends ChecksummedDataInput
{
@@ -160,7 +161,15 @@ public final class CompressedChecksummedDataInput extends ChecksummedDataInput
long position = input.getPosition();
input.close();
- return new CompressedChecksummedDataInput(new ChannelProxy(input.getPath()), compressor, position);
+ ChannelProxy channel = new ChannelProxy(input.getPath());
+ try
+ {
+ return new CompressedChecksummedDataInput(channel, compressor, position);
+ }
+ catch (Throwable t)
+ {
+ throw Throwables.cleaned(channel.close(t));
+ }
}
@VisibleForTesting
diff --git a/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
index a70a443..5edd8a8 100644
--- a/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.security.EncryptionUtils;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.compress.ICompressor;
import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.utils.Throwables;
public class EncryptedChecksummedDataInput extends ChecksummedDataInput
{
@@ -137,7 +138,15 @@ public class EncryptedChecksummedDataInput extends ChecksummedDataInput
long position = input.getPosition();
input.close();
- return new EncryptedChecksummedDataInput(new ChannelProxy(input.getPath()), cipher, compressor, position);
+ ChannelProxy channel = new ChannelProxy(input.getPath());
+ try
+ {
+ return new EncryptedChecksummedDataInput(channel, cipher, compressor, position);
+ }
+ catch (Throwable t)
+ {
+ throw Throwables.cleaned(channel.close(t));
+ }
}
@VisibleForTesting
diff --git a/src/java/org/apache/cassandra/hints/HintsWriter.java b/src/java/org/apache/cassandra/hints/HintsWriter.java
index 5997eb4..589802b 100644
--- a/src/java/org/apache/cassandra/hints/HintsWriter.java
+++ b/src/java/org/apache/cassandra/hints/HintsWriter.java
@@ -81,18 +81,18 @@ class HintsWriter implements AutoCloseable
ByteBuffer descriptorBytes = dob.buffer();
updateChecksum(crc, descriptorBytes);
channel.write(descriptorBytes);
+
+ if (descriptor.isEncrypted())
+ return new EncryptedHintsWriter(directory, descriptor, file, channel, fd, crc);
+ if (descriptor.isCompressed())
+ return new CompressedHintsWriter(directory, descriptor, file, channel, fd, crc);
+ return new HintsWriter(directory, descriptor, file, channel, fd, crc);
}
catch (Throwable e)
{
channel.close();
throw e;
}
-
- if (descriptor.isEncrypted())
- return new EncryptedHintsWriter(directory, descriptor, file, channel, fd, crc);
- if (descriptor.isCompressed())
- return new CompressedHintsWriter(directory, descriptor, file, channel, fd, crc);
- return new HintsWriter(directory, descriptor, file, channel, fd, crc);
}
HintsDescriptor descriptor()
diff --git a/src/java/org/apache/cassandra/io/util/FileHandle.java b/src/java/org/apache/cassandra/io/util/FileHandle.java
index a3afc2f..b705769 100644
--- a/src/java/org/apache/cassandra/io/util/FileHandle.java
+++ b/src/java/org/apache/cassandra/io/util/FileHandle.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.utils.concurrent.RefCounted;
import org.apache.cassandra.utils.concurrent.SharedCloseableImpl;
import static org.apache.cassandra.utils.Throwables.maybeFail;
+import org.apache.cassandra.utils.Throwables;
/**
* {@link FileHandle} provides access to a file for reading, including the ones written by various {@link SequentialWriter}
@@ -341,9 +342,11 @@ public class FileHandle extends SharedCloseableImpl
@SuppressWarnings("resource")
public FileHandle complete(long overrideLength)
{
+ boolean channelOpened = false;
if (channel == null)
{
channel = new ChannelProxy(path);
+ channelOpened = true;
}
ChannelProxy channelCopy = channel.sharedCopy();
@@ -388,6 +391,12 @@ public class FileHandle extends SharedCloseableImpl
catch (Throwable t)
{
channelCopy.close();
+ if (channelOpened)
+ {
+ ChannelProxy c = channel;
+ channel = null;
+ throw Throwables.cleaned(c.close(t));
+ }
throw t;
}
}
diff --git a/src/java/org/apache/cassandra/utils/Throwables.java b/src/java/org/apache/cassandra/utils/Throwables.java
index 5d6d96f..9c6da60 100644
--- a/src/java/org/apache/cassandra/utils/Throwables.java
+++ b/src/java/org/apache/cassandra/utils/Throwables.java
@@ -20,9 +20,12 @@ package org.apache.cassandra.utils;
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Optional;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
import java.util.function.Predicate;
import java.util.stream.Stream;
@@ -193,4 +196,46 @@ public final class Throwables
}
return Optional.empty();
}
+
+ /**
+ * If the provided throwable is a "wrapping" exception (see below), return the cause of that throwable, otherwise
+ * return its argument untouched.
+ * <p>
+ * We call a "wrapping" exception in the context of that method an exception whose only purpose is to wrap another
+ * exception, and currently this method recognize only 2 exception as "wrapping" ones: {@link ExecutionException}
+ * and {@link CompletionException}.
+ */
+ public static Throwable unwrapped(Throwable t)
+ {
+ Throwable unwrapped = t;
+ while (unwrapped instanceof CompletionException ||
+ unwrapped instanceof ExecutionException ||
+ unwrapped instanceof InvocationTargetException)
+ unwrapped = unwrapped.getCause();
+
+ // I don't think it make sense for those 2 exception classes to ever be used with null causes, but no point
+ // in failing here if this happen. We still wrap the original exception if that happen so we get a sign
+ // that the assumption of this method is wrong.
+ return unwrapped == null
+ ? new RuntimeException("Got wrapping exception not wrapping anything", t)
+ : unwrapped;
+ }
+
+ /**
+ * If the provided exception is unchecked, return it directly, otherwise wrap it into a {@link RuntimeException}
+ * to make it unchecked.
+ */
+ public static RuntimeException unchecked(Throwable t)
+ {
+ return t instanceof RuntimeException ? (RuntimeException)t : new RuntimeException(t);
+ }
+
+ /**
+ * A shortcut for {@code unchecked(unwrapped(t))}. This is called "cleaned" because this basically removes the annoying
+ * cruft surrounding an exception :).
+ */
+ public static RuntimeException cleaned(Throwable t)
+ {
+ return unchecked(unwrapped(t));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org