You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2019/11/20 14:59:09 UTC

[cassandra] branch trunk updated: Close channels on error

This is an automated email from the ASF dual-hosted git repository.

snazy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


View the commit online:
https://github.com/apache/cassandra/commit/c2e11bd4224b2110abe6aa84c8882e85980e3491

The following commit(s) were added to refs/heads/trunk by this push:
     new c2e11bd  Close channels on error
c2e11bd is described below

commit c2e11bd4224b2110abe6aa84c8882e85980e3491
Author: Ekaterina Dimitrova <ek...@datastax.com>
AuthorDate: Fri Nov 8 15:15:20 2019 -0500

    Close channels on error
    
    Patch by Ekaterina Dimitrova, reviewed by Robert Stupp for CASSANDRA-15407
---
 CHANGES.txt                                        |  1 +
 .../cassandra/hints/ChecksummedDataInput.java      | 11 +++++-
 .../hints/CompressedChecksummedDataInput.java      | 11 +++++-
 .../hints/EncryptedChecksummedDataInput.java       | 11 +++++-
 .../org/apache/cassandra/hints/HintsWriter.java    | 12 +++---
 .../org/apache/cassandra/io/util/FileHandle.java   |  9 +++++
 .../org/apache/cassandra/utils/Throwables.java     | 45 ++++++++++++++++++++++
 7 files changed, 91 insertions(+), 9 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index da57886..d44306c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,6 +3,7 @@
  * Optimise native protocol ASCII string encoding (CASSANDRA-15410)
  * Make sure all exceptions are propagated in DebuggableThreadPoolExecutor (CASSANDRA-15332)
  * Make it possible to resize concurrent read / write thread pools at runtime (CASSANDRA-15277)
+ * Close channels on error (CASSANDRA-15407)
 Merged from 2.2:
  * In-JVM DTest: Set correct internode message version for upgrade test (CASSANDRA-15371)
 
diff --git a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
index 6ebc830..30d18fa 100644
--- a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
@@ -26,6 +26,7 @@ import com.google.common.base.Preconditions;
 
 import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.NativeLibrary;
 import org.apache.cassandra.utils.memory.BufferPool;
 
@@ -74,7 +75,15 @@ public class ChecksummedDataInput extends RebufferingInputStream
     @SuppressWarnings("resource")
     public static ChecksummedDataInput open(File file)
     {
-        return new ChecksummedDataInput(new ChannelProxy(file));
+        ChannelProxy channel = new ChannelProxy(file);
+        try
+        {
+            return new ChecksummedDataInput(channel);
+        }
+        catch (Throwable t)
+        {
+            throw Throwables.cleaned(channel.close(t));
+        }
     }
 
     public boolean isEOF()
diff --git a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
index 4982a03..0381b00 100644
--- a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.util.ChannelProxy;
 import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.Throwables;
 
 public final class CompressedChecksummedDataInput extends ChecksummedDataInput
 {
@@ -160,7 +161,15 @@ public final class CompressedChecksummedDataInput extends ChecksummedDataInput
         long position = input.getPosition();
         input.close();
 
-        return new CompressedChecksummedDataInput(new ChannelProxy(input.getPath()), compressor, position);
+        ChannelProxy channel = new ChannelProxy(input.getPath());
+        try
+        {
+            return new CompressedChecksummedDataInput(channel, compressor, position);
+        }
+        catch (Throwable t)
+        {
+            throw Throwables.cleaned(channel.close(t));
+        }
     }
 
     @VisibleForTesting
diff --git a/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
index a70a443..5edd8a8 100644
--- a/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.security.EncryptionUtils;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.utils.Throwables;
 
 public class EncryptedChecksummedDataInput extends ChecksummedDataInput
 {
@@ -137,7 +138,15 @@ public class EncryptedChecksummedDataInput extends ChecksummedDataInput
         long position = input.getPosition();
         input.close();
 
-        return new EncryptedChecksummedDataInput(new ChannelProxy(input.getPath()), cipher, compressor, position);
+        ChannelProxy channel = new ChannelProxy(input.getPath());
+        try
+        {
+            return new EncryptedChecksummedDataInput(channel, cipher, compressor, position);
+        }
+        catch (Throwable t)
+        {
+            throw Throwables.cleaned(channel.close(t));
+        }
     }
 
     @VisibleForTesting
diff --git a/src/java/org/apache/cassandra/hints/HintsWriter.java b/src/java/org/apache/cassandra/hints/HintsWriter.java
index 5997eb4..589802b 100644
--- a/src/java/org/apache/cassandra/hints/HintsWriter.java
+++ b/src/java/org/apache/cassandra/hints/HintsWriter.java
@@ -81,18 +81,18 @@ class HintsWriter implements AutoCloseable
             ByteBuffer descriptorBytes = dob.buffer();
             updateChecksum(crc, descriptorBytes);
             channel.write(descriptorBytes);
+
+            if (descriptor.isEncrypted())
+                return new EncryptedHintsWriter(directory, descriptor, file, channel, fd, crc);
+            if (descriptor.isCompressed())
+                return new CompressedHintsWriter(directory, descriptor, file, channel, fd, crc);
+            return new HintsWriter(directory, descriptor, file, channel, fd, crc);
         }
         catch (Throwable e)
         {
             channel.close();
             throw e;
         }
-
-        if (descriptor.isEncrypted())
-            return new EncryptedHintsWriter(directory, descriptor, file, channel, fd, crc);
-        if (descriptor.isCompressed())
-            return new CompressedHintsWriter(directory, descriptor, file, channel, fd, crc);
-        return new HintsWriter(directory, descriptor, file, channel, fd, crc);
     }
 
     HintsDescriptor descriptor()
diff --git a/src/java/org/apache/cassandra/io/util/FileHandle.java b/src/java/org/apache/cassandra/io/util/FileHandle.java
index a3afc2f..b705769 100644
--- a/src/java/org/apache/cassandra/io/util/FileHandle.java
+++ b/src/java/org/apache/cassandra/io/util/FileHandle.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.utils.concurrent.RefCounted;
 import org.apache.cassandra.utils.concurrent.SharedCloseableImpl;
 
 import static org.apache.cassandra.utils.Throwables.maybeFail;
+import org.apache.cassandra.utils.Throwables;
 
 /**
  * {@link FileHandle} provides access to a file for reading, including the ones written by various {@link SequentialWriter}
@@ -341,9 +342,11 @@ public class FileHandle extends SharedCloseableImpl
         @SuppressWarnings("resource")
         public FileHandle complete(long overrideLength)
         {
+            boolean channelOpened = false;
             if (channel == null)
             {
                 channel = new ChannelProxy(path);
+                channelOpened = true;
             }
 
             ChannelProxy channelCopy = channel.sharedCopy();
@@ -388,6 +391,12 @@ public class FileHandle extends SharedCloseableImpl
             catch (Throwable t)
             {
                 channelCopy.close();
+                if (channelOpened)
+                {
+                    ChannelProxy c = channel;
+                    channel = null;
+                    throw Throwables.cleaned(c.close(t));
+                }
                 throw t;
             }
         }
diff --git a/src/java/org/apache/cassandra/utils/Throwables.java b/src/java/org/apache/cassandra/utils/Throwables.java
index 5d6d96f..9c6da60 100644
--- a/src/java/org/apache/cassandra/utils/Throwables.java
+++ b/src/java/org/apache/cassandra/utils/Throwables.java
@@ -20,9 +20,12 @@ package org.apache.cassandra.utils;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Optional;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
 import java.util.function.Predicate;
 import java.util.stream.Stream;
 
@@ -193,4 +196,46 @@ public final class Throwables
         }
         return Optional.empty();
     }
+
+    /**
+     * If the provided throwable is a "wrapping" exception (see below), return the cause of that throwable, otherwise
+     * return its argument untouched.
+     * <p>
+     * We call a "wrapping" exception in the context of that method an exception whose only purpose is to wrap another
+     * exception, and currently this method recognize only 2 exception as "wrapping" ones: {@link ExecutionException}
+     * and {@link CompletionException}.
+     */
+    public static Throwable unwrapped(Throwable t)
+    {
+        Throwable unwrapped = t;
+        while (unwrapped instanceof CompletionException ||
+               unwrapped instanceof ExecutionException ||
+               unwrapped instanceof InvocationTargetException)
+            unwrapped = unwrapped.getCause();
+
+        // I don't think it make sense for those 2 exception classes to ever be used with null causes, but no point
+        // in failing here if this happen. We still wrap the original exception if that happen so we get a sign
+        // that the assumption of this method is wrong.
+        return unwrapped == null
+               ? new RuntimeException("Got wrapping exception not wrapping anything", t)
+               : unwrapped;
+    }
+
+    /**
+     * If the provided exception is unchecked, return it directly, otherwise wrap it into a {@link RuntimeException}
+     * to make it unchecked.
+     */
+    public static RuntimeException unchecked(Throwable t)
+    {
+        return t instanceof RuntimeException ? (RuntimeException)t : new RuntimeException(t);
+    }
+
+    /**
+     * A shortcut for {@code unchecked(unwrapped(t))}. This is called "cleaned" because this basically removes the annoying
+     * cruft surrounding an exception :).
+     */
+    public static RuntimeException cleaned(Throwable t)
+    {
+        return unchecked(unwrapped(t));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org