You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/01/07 14:06:06 UTC
[2/6] cassandra git commit: Ensure SSTableWriter cleans up properly
after failure
Ensure SSTableWriter cleans up properly after failure
patch by benedict; reviewed by marcuse
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3679b1bd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3679b1bd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3679b1bd
Branch: refs/heads/cassandra-2.1
Commit: 3679b1bd270fcfbbae4c8ac9eb5114cc98fbe4de
Parents: e27cdf9
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Jan 7 12:54:48 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Jan 7 12:54:48 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../io/compress/CompressedSequentialWriter.java | 6 ++++
.../io/compress/CompressionMetadata.java | 17 +++++++++
.../cassandra/io/sstable/SSTableWriter.java | 11 ++++--
.../cassandra/io/util/SequentialWriter.java | 37 +++++++++++++++-----
.../cassandra/utils/AlwaysPresentFilter.java | 2 +-
.../org/apache/cassandra/utils/BloomFilter.java | 2 +-
.../org/apache/cassandra/utils/IFilter.java | 2 ++
.../org/apache/cassandra/utils/obs/IBitSet.java | 2 ++
.../cassandra/utils/obs/OffHeapBitSet.java | 2 +-
.../apache/cassandra/utils/obs/OpenBitSet.java | 2 +-
11 files changed, 69 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c8cf1d4..7aad4c0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.12:
+ * Ensure SSTableWriter cleans up properly after failure (CASSANDRA-8499)
* Increase bf true positive count on key cache hit (CASSANDRA-8525)
* Move MeteredFlusher to its own thread (CASSANDRA-8485)
* Fix non-distinct results in DISTNCT queries on static columns when
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index eef5b17..909d822 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -271,6 +271,12 @@ public class CompressedSequentialWriter extends SequentialWriter
}
}
+ public void abort()
+ {
+ super.abort();
+ metadataWriter.abort();
+ }
+
/**
* Class to hold a mark to the position of the file
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 231778a..5b0154b 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -23,6 +23,9 @@ import java.util.*;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Longs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.FSReadError;
@@ -40,6 +43,8 @@ import org.apache.cassandra.utils.Pair;
*/
public class CompressionMetadata
{
+ private static final Logger logger = LoggerFactory.getLogger(CompressionMetadata.class);
+
public final long dataLength;
public final long compressedFileLength;
public final boolean hasPostCompressionAdlerChecksums;
@@ -375,6 +380,18 @@ public class CompressionMetadata
getChannel().force(true);
super.close();
}
+
+ public void abort()
+ {
+ try
+ {
+ super.close();
+ }
+ catch (Throwable t)
+ {
+ logger.warn("Suppressed exception while closing CompressionMetadata.Writer for {}", filePath, t);
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index afa066d..08e5527 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -314,8 +314,8 @@ public class SSTableWriter extends SSTable
public void abort()
{
assert descriptor.temporary;
- FileUtils.closeQuietly(iwriter);
- FileUtils.closeQuietly(dataFile);
+ iwriter.abort();
+ dataFile.abort();
Set<Component> components = SSTable.componentsFor(descriptor);
try
@@ -391,6 +391,7 @@ public class SSTableWriter extends SSTable
}
catch (IOException e)
{
+ out.abort();
throw new FSWriteError(e, out.getPath());
}
out.close();
@@ -498,6 +499,12 @@ public class SSTableWriter extends SSTable
FileUtils.truncate(indexFile.getPath(), position);
}
+ public void abort()
+ {
+ indexFile.abort();
+ bf.close();
+ }
+
public void mark()
{
mark = indexFile.mark();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index dc95676..b980cf1 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -20,6 +20,9 @@ package org.apache.cassandra.io.util;
import java.io.*;
import java.nio.channels.ClosedChannelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.FSWriteError;
@@ -31,6 +34,8 @@ import org.apache.cassandra.utils.CLibrary;
*/
public class SequentialWriter extends OutputStream
{
+ private static final Logger logger = LoggerFactory.getLogger(SequentialWriter.class);
+
// isDirty - true if this.buffer contains any un-synced bytes
protected boolean isDirty = false, syncNeeded = false;
@@ -385,17 +390,31 @@ public class SequentialWriter extends OutputStream
if (skipIOCache && bytesSinceCacheFlush > 0)
CLibrary.trySkipCache(fd, 0, 0);
- try
- {
- out.close();
- }
- catch (IOException e)
- {
- throw new FSWriteError(e, getPath());
- }
+ cleanup(true);
+ }
+ public void abort()
+ {
+ cleanup(false);
+ }
+
+ private void cleanup(boolean throwExceptions)
+ {
FileUtils.closeQuietly(metadata);
- CLibrary.tryCloseFD(directoryFD);
+
+ try { CLibrary.tryCloseFD(directoryFD); }
+ catch (Throwable t) { handle(t, throwExceptions); }
+
+ try { out.close(); }
+ catch (Throwable t) { handle(t, throwExceptions); }
+ }
+
+ private void handle(Throwable t, boolean throwExceptions)
+ {
+ if (!throwExceptions)
+ logger.warn("Suppressing exception thrown while aborting writer", t);
+ else
+ throw new FSWriteError(t, getPath());
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
index 0886edc..2ba4400 100644
--- a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
+++ b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
@@ -31,7 +31,7 @@ public class AlwaysPresentFilter implements IFilter
public void clear() { }
- public void close() throws IOException { }
+ public void close() { }
public long serializedSize() { return 0; }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/BloomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BloomFilter.java b/src/java/org/apache/cassandra/utils/BloomFilter.java
index 9fbb38e..e50a746 100644
--- a/src/java/org/apache/cassandra/utils/BloomFilter.java
+++ b/src/java/org/apache/cassandra/utils/BloomFilter.java
@@ -112,7 +112,7 @@ public abstract class BloomFilter implements IFilter
bitset.clear();
}
- public void close() throws IOException
+ public void close()
{
bitset.close();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/IFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/IFilter.java b/src/java/org/apache/cassandra/utils/IFilter.java
index aed5f39..f0771c6 100644
--- a/src/java/org/apache/cassandra/utils/IFilter.java
+++ b/src/java/org/apache/cassandra/utils/IFilter.java
@@ -35,4 +35,6 @@ public interface IFilter extends Closeable
* @return the amount of memory in bytes used off heap
*/
long offHeapSize();
+
+ void close();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/obs/IBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/IBitSet.java b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
index 47ba492..42db722 100644
--- a/src/java/org/apache/cassandra/utils/obs/IBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
@@ -55,4 +55,6 @@ public interface IBitSet extends Closeable
* @return the amount of memory in bytes used off heap
*/
public long offHeapSize();
+
+ public void close();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
index 5063d80..7d47d14 100644
--- a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
@@ -145,7 +145,7 @@ public class OffHeapBitSet implements IBitSet
return new OffHeapBitSet(memory);
}
- public void close() throws IOException
+ public void close()
{
bytes.free();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
index 3e1efce..b1abe08 100644
--- a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
@@ -393,7 +393,7 @@ public class OpenBitSet implements IBitSet
return (int)((h>>32) ^ h) + 0x98761234;
}
- public void close() throws IOException {
+ public void close() {
// noop, let GC do the cleanup.
}