You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/01/07 14:06:05 UTC

[1/6] cassandra git commit: Ensure SSTableWriter cleans up properly after failure

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 e27cdf935 -> 3679b1bd2
  refs/heads/cassandra-2.1 edf48f817 -> 55750e07d
  refs/heads/trunk e1e28d0f0 -> 729ebe078


Ensure SSTableWriter cleans up properly after failure

patch by benedict; reviewed by marcuse


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3679b1bd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3679b1bd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3679b1bd

Branch: refs/heads/cassandra-2.0
Commit: 3679b1bd270fcfbbae4c8ac9eb5114cc98fbe4de
Parents: e27cdf9
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Jan 7 12:54:48 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Jan 7 12:54:48 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../io/compress/CompressedSequentialWriter.java |  6 ++++
 .../io/compress/CompressionMetadata.java        | 17 +++++++++
 .../cassandra/io/sstable/SSTableWriter.java     | 11 ++++--
 .../cassandra/io/util/SequentialWriter.java     | 37 +++++++++++++++-----
 .../cassandra/utils/AlwaysPresentFilter.java    |  2 +-
 .../org/apache/cassandra/utils/BloomFilter.java |  2 +-
 .../org/apache/cassandra/utils/IFilter.java     |  2 ++
 .../org/apache/cassandra/utils/obs/IBitSet.java |  2 ++
 .../cassandra/utils/obs/OffHeapBitSet.java      |  2 +-
 .../apache/cassandra/utils/obs/OpenBitSet.java  |  2 +-
 11 files changed, 69 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c8cf1d4..7aad4c0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.12:
+ * Ensure SSTableWriter cleans up properly after failure (CASSANDRA-8499)
  * Increase bf true positive count on key cache hit (CASSANDRA-8525)
  * Move MeteredFlusher to its own thread (CASSANDRA-8485)
  * Fix non-distinct results in DISTNCT queries on static columns when

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index eef5b17..909d822 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -271,6 +271,12 @@ public class CompressedSequentialWriter extends SequentialWriter
         }
     }
 
+    public void abort()
+    {
+        super.abort();
+        metadataWriter.abort();
+    }
+
     /**
      * Class to hold a mark to the position of the file
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 231778a..5b0154b 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -23,6 +23,9 @@ import java.util.*;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Longs;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.FSReadError;
@@ -40,6 +43,8 @@ import org.apache.cassandra.utils.Pair;
  */
 public class CompressionMetadata
 {
+    private static final Logger logger = LoggerFactory.getLogger(CompressionMetadata.class);
+
     public final long dataLength;
     public final long compressedFileLength;
     public final boolean hasPostCompressionAdlerChecksums;
@@ -375,6 +380,18 @@ public class CompressionMetadata
                 getChannel().force(true);
             super.close();
         }
+
+        public void abort()
+        {
+            try
+            {
+                super.close();
+            }
+            catch (Throwable t)
+            {
+                logger.warn("Suppressed exception while closing CompressionMetadata.Writer for {}", filePath, t);
+            }
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index afa066d..08e5527 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -314,8 +314,8 @@ public class SSTableWriter extends SSTable
     public void abort()
     {
         assert descriptor.temporary;
-        FileUtils.closeQuietly(iwriter);
-        FileUtils.closeQuietly(dataFile);
+        iwriter.abort();
+        dataFile.abort();
 
         Set<Component> components = SSTable.componentsFor(descriptor);
         try
@@ -391,6 +391,7 @@ public class SSTableWriter extends SSTable
         }
         catch (IOException e)
         {
+            out.abort();
             throw new FSWriteError(e, out.getPath());
         }
         out.close();
@@ -498,6 +499,12 @@ public class SSTableWriter extends SSTable
             FileUtils.truncate(indexFile.getPath(), position);
         }
 
+        public void abort()
+        {
+            indexFile.abort();
+            bf.close();
+        }
+
         public void mark()
         {
             mark = indexFile.mark();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index dc95676..b980cf1 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -20,6 +20,9 @@ package org.apache.cassandra.io.util;
 import java.io.*;
 import java.nio.channels.ClosedChannelException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.FSWriteError;
@@ -31,6 +34,8 @@ import org.apache.cassandra.utils.CLibrary;
  */
 public class SequentialWriter extends OutputStream
 {
+    private static final Logger logger = LoggerFactory.getLogger(SequentialWriter.class);
+
     // isDirty - true if this.buffer contains any un-synced bytes
     protected boolean isDirty = false, syncNeeded = false;
 
@@ -385,17 +390,31 @@ public class SequentialWriter extends OutputStream
         if (skipIOCache && bytesSinceCacheFlush > 0)
             CLibrary.trySkipCache(fd, 0, 0);
 
-        try
-        {
-            out.close();
-        }
-        catch (IOException e)
-        {
-            throw new FSWriteError(e, getPath());
-        }
+        cleanup(true);
+    }
 
+    public void abort()
+    {
+        cleanup(false);
+    }
+
+    private void cleanup(boolean throwExceptions)
+    {
         FileUtils.closeQuietly(metadata);
-        CLibrary.tryCloseFD(directoryFD);
+
+        try { CLibrary.tryCloseFD(directoryFD); }
+        catch (Throwable t) { handle(t, throwExceptions); }
+
+        try { out.close(); }
+        catch (Throwable t) { handle(t, throwExceptions); }
+    }
+
+    private void handle(Throwable t, boolean throwExceptions)
+    {
+        if (!throwExceptions)
+            logger.warn("Suppressing exception thrown while aborting writer", t);
+        else
+            throw new FSWriteError(t, getPath());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
index 0886edc..2ba4400 100644
--- a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
+++ b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
@@ -31,7 +31,7 @@ public class AlwaysPresentFilter implements IFilter
 
     public void clear() { }
 
-    public void close() throws IOException { }
+    public void close() { }
 
     public long serializedSize() { return 0; }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/BloomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BloomFilter.java b/src/java/org/apache/cassandra/utils/BloomFilter.java
index 9fbb38e..e50a746 100644
--- a/src/java/org/apache/cassandra/utils/BloomFilter.java
+++ b/src/java/org/apache/cassandra/utils/BloomFilter.java
@@ -112,7 +112,7 @@ public abstract class BloomFilter implements IFilter
         bitset.clear();
     }
 
-    public void close() throws IOException
+    public void close()
     {
         bitset.close();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/IFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/IFilter.java b/src/java/org/apache/cassandra/utils/IFilter.java
index aed5f39..f0771c6 100644
--- a/src/java/org/apache/cassandra/utils/IFilter.java
+++ b/src/java/org/apache/cassandra/utils/IFilter.java
@@ -35,4 +35,6 @@ public interface IFilter extends Closeable
      * @return the amount of memory in bytes used off heap
      */
     long offHeapSize();
+
+    void close();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/obs/IBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/IBitSet.java b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
index 47ba492..42db722 100644
--- a/src/java/org/apache/cassandra/utils/obs/IBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
@@ -55,4 +55,6 @@ public interface IBitSet extends Closeable
      * @return the amount of memory in bytes used off heap
      */
     public long offHeapSize();
+
+    public void close();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
index 5063d80..7d47d14 100644
--- a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
@@ -145,7 +145,7 @@ public class OffHeapBitSet implements IBitSet
         return new OffHeapBitSet(memory);
     }
 
-    public void close() throws IOException
+    public void close()
     {
         bytes.free();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
index 3e1efce..b1abe08 100644
--- a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
@@ -393,7 +393,7 @@ public class OpenBitSet implements IBitSet
     return (int)((h>>32) ^ h) + 0x98761234;
   }
 
-  public void close() throws IOException {
+  public void close() {
     // noop, let GC do the cleanup.
   }
 


[6/6] cassandra git commit: Merge branch 'cassandra-2.1' into trunk

Posted by be...@apache.org.
Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/729ebe07
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/729ebe07
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/729ebe07

Branch: refs/heads/trunk
Commit: 729ebe078a67fd1aa779b26a4738b76da23af04d
Parents: e1e28d0 55750e0
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Jan 7 13:05:33 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Jan 7 13:05:33 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 ++
 .../io/compress/CompressedSequentialWriter.java |  6 +++
 .../io/compress/CompressionMetadata.java        |  9 +++++
 .../io/sstable/format/big/BigTableWriter.java   | 21 ++++++-----
 .../io/util/ChecksummedSequentialWriter.java    |  6 +++
 .../cassandra/io/util/SequentialWriter.java     | 39 ++++++++++++++++----
 6 files changed, 67 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/729ebe07/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/729ebe07/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/729ebe07/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/729ebe07/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index c52184b,0000000..2d34209
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@@ -1,588 -1,0 +1,591 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.sstable.format.big;
 +
 +import java.io.Closeable;
 +import java.io.DataInput;
 +import java.io.File;
 +import java.io.FileOutputStream;
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.Collections;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.io.sstable.*;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableWriter;
 +import org.apache.cassandra.io.sstable.format.Version;
 +import org.apache.cassandra.io.util.*;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.io.FSWriteError;
 +import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 +import org.apache.cassandra.io.sstable.metadata.MetadataType;
 +import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 +import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
 +import org.apache.cassandra.io.util.FileMark;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.io.util.SegmentedFile;
 +import org.apache.cassandra.io.util.SequentialWriter;
 +import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.FilterFactory;
 +import org.apache.cassandra.utils.IFilter;
 +import org.apache.cassandra.utils.Pair;
 +import org.apache.cassandra.utils.StreamingHistogram;
 +
 +public class BigTableWriter extends SSTableWriter
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(BigTableWriter.class);
 +
 +    // not very random, but the only value that can't be mistaken for a legal column-name length
 +    public static final int END_OF_ROW = 0x0000;
 +
 +    private IndexWriter iwriter;
 +    private SegmentedFile.Builder dbuilder;
 +    private final SequentialWriter dataFile;
 +    private DecoratedKey lastWrittenKey;
 +    private FileMark dataMark;
 +
 +    BigTableWriter(Descriptor descriptor, Long keyCount, Long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector)
 +    {
 +        super(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector);
 +
 +        iwriter = new IndexWriter(keyCount);
 +
 +        if (compression)
 +        {
 +            dataFile = SequentialWriter.open(getFilename(),
 +                                             descriptor.filenameFor(Component.COMPRESSION_INFO),
 +                                             metadata.compressionParameters(),
 +                                             metadataCollector);
 +            dbuilder = SegmentedFile.getCompressedBuilder((CompressedSequentialWriter) dataFile);
 +        }
 +        else
 +        {
 +            dataFile = SequentialWriter.open(new File(getFilename()), new File(descriptor.filenameFor(Component.CRC)));
 +            dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 +        }
 +    }
 +
 +    public void mark()
 +    {
 +        dataMark = dataFile.mark();
 +        iwriter.mark();
 +    }
 +
 +    public void resetAndTruncate()
 +    {
 +        dataFile.resetAndTruncate(dataMark);
 +        iwriter.resetAndTruncate();
 +    }
 +
 +    /**
 +     * Perform sanity checks on @param decoratedKey and @return the position in the data file before any data is written
 +     */
 +    private long beforeAppend(DecoratedKey decoratedKey)
 +    {
 +        assert decoratedKey != null : "Keys must not be null"; // empty keys ARE allowed b/c of indexed column values
 +        if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey) >= 0)
 +            throw new RuntimeException("Last written key " + lastWrittenKey + " >= current key " + decoratedKey + " writing into " + getFilename());
 +        return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer();
 +    }
 +
 +    private void afterAppend(DecoratedKey decoratedKey, long dataPosition, RowIndexEntry index)
 +    {
 +        metadataCollector.addKey(decoratedKey.getKey());
 +        lastWrittenKey = decoratedKey;
 +        last = lastWrittenKey;
 +        if (first == null)
 +            first = lastWrittenKey;
 +
 +        if (logger.isTraceEnabled())
 +            logger.trace("wrote {} at {}", decoratedKey, dataPosition);
 +        iwriter.append(decoratedKey, index);
 +        dbuilder.addPotentialBoundary(dataPosition);
 +    }
 +
 +    /**
 +     * @param row
 +     * @return null if the row was compacted away entirely; otherwise, the PK index entry for this row
 +     */
 +    public RowIndexEntry append(AbstractCompactedRow row)
 +    {
 +        long currentPosition = beforeAppend(row.key);
 +        RowIndexEntry entry;
 +        try
 +        {
 +            entry = row.write(currentPosition, dataFile);
 +            if (entry == null)
 +                return null;
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, dataFile.getPath());
 +        }
 +        metadataCollector.update(dataFile.getFilePointer() - currentPosition, row.columnStats());
 +        afterAppend(row.key, currentPosition, entry);
 +        return entry;
 +    }
 +
 +    public void append(DecoratedKey decoratedKey, ColumnFamily cf)
 +    {
 +        if (decoratedKey.getKey().remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
 +        {
 +            logger.error("Key size {} exceeds maximum of {}, skipping row",
 +                         decoratedKey.getKey().remaining(),
 +                         FBUtilities.MAX_UNSIGNED_SHORT);
 +            return;
 +        }
 +
 +        long startPosition = beforeAppend(decoratedKey);
 +        try
 +        {
 +            RowIndexEntry entry = rawAppend(cf, startPosition, decoratedKey, dataFile.stream);
 +            afterAppend(decoratedKey, startPosition, entry);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, dataFile.getPath());
 +        }
 +        metadataCollector.update(dataFile.getFilePointer() - startPosition, cf.getColumnStats());
 +    }
 +
 +    private static RowIndexEntry rawAppend(ColumnFamily cf, long startPosition, DecoratedKey key, DataOutputPlus out) throws IOException
 +    {
 +        assert cf.hasColumns() || cf.isMarkedForDelete();
 +
 +        ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, key.getKey(), out);
 +        ColumnIndex index = builder.build(cf);
 +
 +        out.writeShort(END_OF_ROW);
 +        return RowIndexEntry.create(startPosition, cf.deletionInfo().getTopLevelDeletion(), index);
 +    }
 +
 +    /**
 +     * @throws IOException if a read from the DataInput fails
 +     * @throws FSWriteError if a write to the dataFile fails
 +     */
 +    public long appendFromStream(DecoratedKey key, CFMetaData metadata, DataInput in, Version version) throws IOException
 +    {
 +        long currentPosition = beforeAppend(key);
 +
 +        ColumnStats.MaxLongTracker maxTimestampTracker = new ColumnStats.MaxLongTracker(Long.MAX_VALUE);
 +        ColumnStats.MinLongTracker minTimestampTracker = new ColumnStats.MinLongTracker(Long.MIN_VALUE);
 +        ColumnStats.MaxIntTracker maxDeletionTimeTracker = new ColumnStats.MaxIntTracker(Integer.MAX_VALUE);
 +        List<ByteBuffer> minColumnNames = Collections.emptyList();
 +        List<ByteBuffer> maxColumnNames = Collections.emptyList();
 +        StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE);
 +        boolean hasLegacyCounterShards = false;
 +
 +        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata);
 +        cf.delete(DeletionTime.serializer.deserialize(in));
 +
 +        ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.getKey(), dataFile.stream);
 +
 +        if (cf.deletionInfo().getTopLevelDeletion().localDeletionTime < Integer.MAX_VALUE)
 +        {
 +            tombstones.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
 +            maxDeletionTimeTracker.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
 +            minTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
 +            maxTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
 +        }
 +
 +        Iterator<RangeTombstone> rangeTombstoneIterator = cf.deletionInfo().rangeIterator();
 +        while (rangeTombstoneIterator.hasNext())
 +        {
 +            RangeTombstone rangeTombstone = rangeTombstoneIterator.next();
 +            tombstones.update(rangeTombstone.getLocalDeletionTime());
 +            minTimestampTracker.update(rangeTombstone.timestamp());
 +            maxTimestampTracker.update(rangeTombstone.timestamp());
 +            maxDeletionTimeTracker.update(rangeTombstone.getLocalDeletionTime());
 +            minColumnNames = ColumnNameHelper.minComponents(minColumnNames, rangeTombstone.min, metadata.comparator);
 +            maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, rangeTombstone.max, metadata.comparator);
 +        }
 +
 +        Iterator<OnDiskAtom> iter = AbstractCell.onDiskIterator(in, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, version, metadata.comparator);
 +        try
 +        {
 +            while (iter.hasNext())
 +            {
 +                OnDiskAtom atom = iter.next();
 +                if (atom == null)
 +                    break;
 +
 +                if (atom instanceof CounterCell)
 +                {
 +                    atom = ((CounterCell) atom).markLocalToBeCleared();
 +                    hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) atom).hasLegacyShards();
 +                }
 +
 +                int deletionTime = atom.getLocalDeletionTime();
 +                if (deletionTime < Integer.MAX_VALUE)
 +                    tombstones.update(deletionTime);
 +                minTimestampTracker.update(atom.timestamp());
 +                maxTimestampTracker.update(atom.timestamp());
 +                minColumnNames = ColumnNameHelper.minComponents(minColumnNames, atom.name(), metadata.comparator);
 +                maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, atom.name(), metadata.comparator);
 +                maxDeletionTimeTracker.update(atom.getLocalDeletionTime());
 +
 +                columnIndexer.add(atom); // This write the atom on disk too
 +            }
 +
 +            columnIndexer.maybeWriteEmptyRowHeader();
 +            dataFile.stream.writeShort(END_OF_ROW);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, dataFile.getPath());
 +        }
 +
 +        metadataCollector.updateMinTimestamp(minTimestampTracker.get())
 +                         .updateMaxTimestamp(maxTimestampTracker.get())
 +                         .updateMaxLocalDeletionTime(maxDeletionTimeTracker.get())
 +                         .addRowSize(dataFile.getFilePointer() - currentPosition)
 +                         .addColumnCount(columnIndexer.writtenAtomCount())
 +                         .mergeTombstoneHistogram(tombstones)
 +                         .updateMinColumnNames(minColumnNames)
 +                         .updateMaxColumnNames(maxColumnNames)
 +                         .updateHasLegacyCounterShards(hasLegacyCounterShards);
 +
 +        afterAppend(key, currentPosition, RowIndexEntry.create(currentPosition, cf.deletionInfo().getTopLevelDeletion(), columnIndexer.build()));
 +        return currentPosition;
 +    }
 +
 +    /**
 +     * After failure, attempt to close the index writer and data file before deleting all temp components for the sstable
 +     */
 +    public void abort(boolean closeBf)
 +    {
 +        assert descriptor.type.isTemporary;
 +        if (iwriter == null && dataFile == null)
 +            return;
++
 +        if (iwriter != null)
-         {
-             FileUtils.closeQuietly(iwriter.indexFile);
-             if (closeBf)
-             {
-                 iwriter.bf.close();
-             }
-         }
++            iwriter.abort(closeBf);
++
 +        if (dataFile!= null)
-             FileUtils.closeQuietly(dataFile);
++            dataFile.abort();
 +
 +        Set<Component> components = SSTable.componentsFor(descriptor);
 +        try
 +        {
 +            if (!components.isEmpty())
 +                SSTable.delete(descriptor, components);
 +        }
 +        catch (FSWriteError e)
 +        {
 +            logger.error(String.format("Failed deleting temp components for %s", descriptor), e);
 +            throw e;
 +        }
 +    }
 +
 +    // we use this method to ensure any managed data we may have retained references to during the write are no
 +    // longer referenced, so that we do not need to enclose the expensive call to closeAndOpenReader() in a transaction
 +    public void isolateReferences()
 +    {
 +        // currently we only maintain references to first/last/lastWrittenKey from the data provided; all other
 +        // data retention is done through copying
 +        first = getMinimalKey(first);
 +        last = lastWrittenKey = getMinimalKey(last);
 +    }
 +
 +    private Descriptor makeTmpLinks()
 +    {
 +        // create temp links if they don't already exist
 +        Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK);
 +        if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists())
 +        {
 +            FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new File(link.filenameFor(Component.PRIMARY_INDEX)));
 +            FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new File(link.filenameFor(Component.DATA)));
 +        }
 +        return link;
 +    }
 +
 +    public SSTableReader openEarly(long maxDataAge)
 +    {
 +        StatsMetadata sstableMetadata = (StatsMetadata) metadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
 +                                                  metadata.getBloomFilterFpChance(),
 +                                                  repairedAt).get(MetadataType.STATS);
 +
 +        // find the max (exclusive) readable key
 +        DecoratedKey exclusiveUpperBoundOfReadableIndex = iwriter.getMaxReadableKey(0);
 +        if (exclusiveUpperBoundOfReadableIndex == null)
 +            return null;
 +
 +        Descriptor link = makeTmpLinks();
 +        // open the reader early, giving it a FINAL descriptor type so that it is indistinguishable for other consumers
 +        SegmentedFile ifile = iwriter.builder.complete(link.filenameFor(Component.PRIMARY_INDEX), FinishType.EARLY);
 +        SegmentedFile dfile = dbuilder.complete(link.filenameFor(Component.DATA), FinishType.EARLY);
 +        SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL),
 +                                                           components, metadata,
 +                                                           partitioner, ifile,
 +                                                           dfile, iwriter.summary.build(partitioner, exclusiveUpperBoundOfReadableIndex),
 +                                                           iwriter.bf, maxDataAge, sstableMetadata, SSTableReader.OpenReason.EARLY);
 +
 +        // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed)
 +        sstable.first = getMinimalKey(first);
 +        sstable.last = getMinimalKey(exclusiveUpperBoundOfReadableIndex);
 +        DecoratedKey inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(1);
 +        if (inclusiveUpperBoundOfReadableData == null)
 +        {
 +            // Prevent leaving tmplink files on disk
 +            sstable.releaseReference();
 +            return null;
 +        }
 +        int offset = 2;
 +        while (true)
 +        {
 +            RowIndexEntry indexEntry = sstable.getPosition(inclusiveUpperBoundOfReadableData, SSTableReader.Operator.GT);
 +            if (indexEntry != null && indexEntry.position <= dataFile.getLastFlushOffset())
 +                break;
 +            inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(offset++);
 +            if (inclusiveUpperBoundOfReadableData == null)
 +            {
 +                sstable.releaseReference();
 +                return null;
 +            }
 +        }
 +        sstable.last = getMinimalKey(inclusiveUpperBoundOfReadableData);
 +        return sstable;
 +    }
 +
 +    public SSTableReader closeAndOpenReader()
 +    {
 +        return closeAndOpenReader(System.currentTimeMillis());
 +    }
 +
 +    public SSTableReader closeAndOpenReader(long maxDataAge)
 +    {
 +        return finish(FinishType.NORMAL, maxDataAge, this.repairedAt);
 +    }
 +
 +    public SSTableReader finish(FinishType finishType, long maxDataAge, long repairedAt)
 +    {
 +        Pair<Descriptor, StatsMetadata> p;
 +
 +        p = close(finishType, repairedAt);
 +        Descriptor desc = p.left;
 +        StatsMetadata metadata = p.right;
 +
 +        if (finishType == FinishType.EARLY)
 +            desc = makeTmpLinks();
 +
 +        // finalize in-memory state for the reader
 +        SegmentedFile ifile = iwriter.builder.complete(desc.filenameFor(Component.PRIMARY_INDEX), finishType);
 +        SegmentedFile dfile = dbuilder.complete(desc.filenameFor(Component.DATA), finishType);
 +        SSTableReader sstable = SSTableReader.internalOpen(desc.asType(Descriptor.Type.FINAL),
 +                                                           components,
 +                                                           this.metadata,
 +                                                           partitioner,
 +                                                           ifile,
 +                                                           dfile,
 +                                                           iwriter.summary.build(partitioner),
 +                                                           iwriter.bf,
 +                                                           maxDataAge,
 +                                                           metadata,
 +                                                           finishType.openReason);
 +        sstable.first = getMinimalKey(first);
 +        sstable.last = getMinimalKey(last);
 +
 +        switch (finishType)
 +        {
 +            case NORMAL: case FINISH_EARLY:
 +            // try to save the summaries to disk
 +            sstable.saveSummary(iwriter.builder, dbuilder);
 +            iwriter = null;
 +            dbuilder = null;
 +        }
 +        return sstable;
 +    }
 +
 +    // Close the writer and return the descriptor to the new sstable and it's metadata
 +    public Pair<Descriptor, StatsMetadata> close()
 +    {
 +        return close(FinishType.NORMAL, this.repairedAt);
 +    }
 +
 +    private Pair<Descriptor, StatsMetadata> close(FinishType type, long repairedAt)
 +    {
 +        switch (type)
 +        {
 +            case EARLY: case NORMAL:
 +            iwriter.close();
 +            dataFile.close();
 +        }
 +
 +        // write sstable statistics
 +        Map<MetadataType, MetadataComponent> metadataComponents;
 +        metadataComponents = metadataCollector
 +                             .finalizeMetadata(partitioner.getClass().getCanonicalName(),
 +                                               metadata.getBloomFilterFpChance(),repairedAt);
 +
 +        // remove the 'tmp' marker from all components
 +        Descriptor descriptor = this.descriptor;
 +        switch (type)
 +        {
 +            case NORMAL: case FINISH_EARLY:
 +            dataFile.writeFullChecksum(descriptor);
 +            writeMetadata(descriptor, metadataComponents);
 +            // save the table of components
 +            SSTable.appendTOC(descriptor, components);
 +            descriptor = rename(descriptor, components);
 +        }
 +
 +        return Pair.create(descriptor, (StatsMetadata) metadataComponents.get(MetadataType.STATS));
 +    }
 +
 +    private static void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> components)
 +    {
 +        SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS)));
 +        try
 +        {
 +            desc.getMetadataSerializer().serialize(components, out.stream);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, out.getPath());
 +        }
 +        finally
 +        {
 +            out.close();
 +        }
 +    }
 +
 +    public long getFilePointer()
 +    {
 +        return dataFile.getFilePointer();
 +    }
 +
 +    public long getOnDiskFilePointer()
 +    {
 +        return dataFile.getOnDiskFilePointer();
 +    }
 +
 +    /**
 +     * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.
 +     */
-     class IndexWriter implements Closeable
++    class IndexWriter
 +    {
 +        private final SequentialWriter indexFile;
 +        public final SegmentedFile.Builder builder;
 +        public final IndexSummaryBuilder summary;
 +        public final IFilter bf;
 +        private FileMark mark;
 +
 +        IndexWriter(long keyCount)
 +        {
 +            indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
 +            builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
 +            summary = new IndexSummaryBuilder(keyCount, metadata.getMinIndexInterval(), Downsampling.BASE_SAMPLING_LEVEL);
 +            bf = FilterFactory.getFilter(keyCount, metadata.getBloomFilterFpChance(), true);
 +        }
 +
 +        // finds the last (-offset) decorated key that can be guaranteed to occur fully in the flushed portion of the index file
 +        DecoratedKey getMaxReadableKey(int offset)
 +        {
 +            long maxIndexLength = indexFile.getLastFlushOffset();
 +            return summary.getMaxReadableKey(maxIndexLength, offset);
 +        }
 +
 +        public void append(DecoratedKey key, RowIndexEntry indexEntry)
 +        {
 +            bf.add(key.getKey());
 +            long indexPosition = indexFile.getFilePointer();
 +            try
 +            {
 +                ByteBufferUtil.writeWithShortLength(key.getKey(), indexFile.stream);
 +                rowIndexEntrySerializer.serialize(indexEntry, indexFile.stream);
 +            }
 +            catch (IOException e)
 +            {
 +                throw new FSWriteError(e, indexFile.getPath());
 +            }
 +
 +            if (logger.isTraceEnabled())
 +                logger.trace("wrote index entry: {} at {}", indexEntry, indexPosition);
 +
 +            summary.maybeAddEntry(key, indexPosition);
 +            builder.addPotentialBoundary(indexPosition);
 +        }
 +
++        public void abort(boolean closeBf)
++        {
++            indexFile.abort();
++            if (closeBf)
++                bf.close();
++        }
++
 +        /**
 +         * Closes the index and bloomfilter, making the public state of this writer valid for consumption.
 +         */
 +        public void close()
 +        {
 +            if (components.contains(Component.FILTER))
 +            {
 +                String path = descriptor.filenameFor(Component.FILTER);
 +                try
 +                {
 +                    // bloom filter
 +                    FileOutputStream fos = new FileOutputStream(path);
 +                    DataOutputStreamAndChannel stream = new DataOutputStreamAndChannel(fos);
 +                    FilterFactory.serialize(bf, stream);
 +                    stream.flush();
 +                    fos.getFD().sync();
 +                    stream.close();
 +                }
 +                catch (IOException e)
 +                {
 +                    throw new FSWriteError(e, path);
 +                }
 +            }
 +
 +            // index
 +            long position = indexFile.getFilePointer();
 +            indexFile.close(); // calls force
 +            FileUtils.truncate(indexFile.getPath(), position);
 +        }
 +
 +        public void mark()
 +        {
 +            mark = indexFile.mark();
 +        }
 +
 +        public void resetAndTruncate()
 +        {
 +            // we can't un-set the bloom filter addition, but extra keys in there are harmless.
 +            // we can't reset dbuilder either, but that is the last thing called in afterappend so
 +            // we assume that if that worked then we won't be trying to reset.
 +            indexFile.resetAndTruncate(mark);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/729ebe07/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------


[2/6] cassandra git commit: Ensure SSTableWriter cleans up properly after failure

Posted by be...@apache.org.
Ensure SSTableWriter cleans up properly after failure

patch by benedict; reviewed by marcuse


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3679b1bd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3679b1bd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3679b1bd

Branch: refs/heads/cassandra-2.1
Commit: 3679b1bd270fcfbbae4c8ac9eb5114cc98fbe4de
Parents: e27cdf9
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Jan 7 12:54:48 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Jan 7 12:54:48 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../io/compress/CompressedSequentialWriter.java |  6 ++++
 .../io/compress/CompressionMetadata.java        | 17 +++++++++
 .../cassandra/io/sstable/SSTableWriter.java     | 11 ++++--
 .../cassandra/io/util/SequentialWriter.java     | 37 +++++++++++++++-----
 .../cassandra/utils/AlwaysPresentFilter.java    |  2 +-
 .../org/apache/cassandra/utils/BloomFilter.java |  2 +-
 .../org/apache/cassandra/utils/IFilter.java     |  2 ++
 .../org/apache/cassandra/utils/obs/IBitSet.java |  2 ++
 .../cassandra/utils/obs/OffHeapBitSet.java      |  2 +-
 .../apache/cassandra/utils/obs/OpenBitSet.java  |  2 +-
 11 files changed, 69 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c8cf1d4..7aad4c0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.12:
+ * Ensure SSTableWriter cleans up properly after failure (CASSANDRA-8499)
  * Increase bf true positive count on key cache hit (CASSANDRA-8525)
  * Move MeteredFlusher to its own thread (CASSANDRA-8485)
  * Fix non-distinct results in DISTNCT queries on static columns when

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index eef5b17..909d822 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -271,6 +271,12 @@ public class CompressedSequentialWriter extends SequentialWriter
         }
     }
 
+    public void abort()
+    {
+        super.abort();
+        metadataWriter.abort();
+    }
+
     /**
      * Class to hold a mark to the position of the file
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 231778a..5b0154b 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -23,6 +23,9 @@ import java.util.*;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Longs;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.FSReadError;
@@ -40,6 +43,8 @@ import org.apache.cassandra.utils.Pair;
  */
 public class CompressionMetadata
 {
+    private static final Logger logger = LoggerFactory.getLogger(CompressionMetadata.class);
+
     public final long dataLength;
     public final long compressedFileLength;
     public final boolean hasPostCompressionAdlerChecksums;
@@ -375,6 +380,18 @@ public class CompressionMetadata
                 getChannel().force(true);
             super.close();
         }
+
+        public void abort()
+        {
+            try
+            {
+                super.close();
+            }
+            catch (Throwable t)
+            {
+                logger.warn("Suppressed exception while closing CompressionMetadata.Writer for {}", filePath, t);
+            }
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index afa066d..08e5527 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -314,8 +314,8 @@ public class SSTableWriter extends SSTable
     public void abort()
     {
         assert descriptor.temporary;
-        FileUtils.closeQuietly(iwriter);
-        FileUtils.closeQuietly(dataFile);
+        iwriter.abort();
+        dataFile.abort();
 
         Set<Component> components = SSTable.componentsFor(descriptor);
         try
@@ -391,6 +391,7 @@ public class SSTableWriter extends SSTable
         }
         catch (IOException e)
         {
+            out.abort();
             throw new FSWriteError(e, out.getPath());
         }
         out.close();
@@ -498,6 +499,12 @@ public class SSTableWriter extends SSTable
             FileUtils.truncate(indexFile.getPath(), position);
         }
 
+        public void abort()
+        {
+            indexFile.abort();
+            bf.close();
+        }
+
         public void mark()
         {
             mark = indexFile.mark();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index dc95676..b980cf1 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -20,6 +20,9 @@ package org.apache.cassandra.io.util;
 import java.io.*;
 import java.nio.channels.ClosedChannelException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.FSWriteError;
@@ -31,6 +34,8 @@ import org.apache.cassandra.utils.CLibrary;
  */
 public class SequentialWriter extends OutputStream
 {
+    private static final Logger logger = LoggerFactory.getLogger(SequentialWriter.class);
+
     // isDirty - true if this.buffer contains any un-synced bytes
     protected boolean isDirty = false, syncNeeded = false;
 
@@ -385,17 +390,31 @@ public class SequentialWriter extends OutputStream
         if (skipIOCache && bytesSinceCacheFlush > 0)
             CLibrary.trySkipCache(fd, 0, 0);
 
-        try
-        {
-            out.close();
-        }
-        catch (IOException e)
-        {
-            throw new FSWriteError(e, getPath());
-        }
+        cleanup(true);
+    }
 
+    public void abort()
+    {
+        cleanup(false);
+    }
+
+    private void cleanup(boolean throwExceptions)
+    {
         FileUtils.closeQuietly(metadata);
-        CLibrary.tryCloseFD(directoryFD);
+
+        try { CLibrary.tryCloseFD(directoryFD); }
+        catch (Throwable t) { handle(t, throwExceptions); }
+
+        try { out.close(); }
+        catch (Throwable t) { handle(t, throwExceptions); }
+    }
+
+    private void handle(Throwable t, boolean throwExceptions)
+    {
+        if (!throwExceptions)
+            logger.warn("Suppressing exception thrown while aborting writer", t);
+        else
+            throw new FSWriteError(t, getPath());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
index 0886edc..2ba4400 100644
--- a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
+++ b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
@@ -31,7 +31,7 @@ public class AlwaysPresentFilter implements IFilter
 
     public void clear() { }
 
-    public void close() throws IOException { }
+    public void close() { }
 
     public long serializedSize() { return 0; }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/BloomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BloomFilter.java b/src/java/org/apache/cassandra/utils/BloomFilter.java
index 9fbb38e..e50a746 100644
--- a/src/java/org/apache/cassandra/utils/BloomFilter.java
+++ b/src/java/org/apache/cassandra/utils/BloomFilter.java
@@ -112,7 +112,7 @@ public abstract class BloomFilter implements IFilter
         bitset.clear();
     }
 
-    public void close() throws IOException
+    public void close()
     {
         bitset.close();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/IFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/IFilter.java b/src/java/org/apache/cassandra/utils/IFilter.java
index aed5f39..f0771c6 100644
--- a/src/java/org/apache/cassandra/utils/IFilter.java
+++ b/src/java/org/apache/cassandra/utils/IFilter.java
@@ -35,4 +35,6 @@ public interface IFilter extends Closeable
      * @return the amount of memory in bytes used off heap
      */
     long offHeapSize();
+
+    void close();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/obs/IBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/IBitSet.java b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
index 47ba492..42db722 100644
--- a/src/java/org/apache/cassandra/utils/obs/IBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
@@ -55,4 +55,6 @@ public interface IBitSet extends Closeable
      * @return the amount of memory in bytes used off heap
      */
     public long offHeapSize();
+
+    public void close();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
index 5063d80..7d47d14 100644
--- a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
@@ -145,7 +145,7 @@ public class OffHeapBitSet implements IBitSet
         return new OffHeapBitSet(memory);
     }
 
-    public void close() throws IOException
+    public void close()
     {
         bytes.free();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
index 3e1efce..b1abe08 100644
--- a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
@@ -393,7 +393,7 @@ public class OpenBitSet implements IBitSet
     return (int)((h>>32) ^ h) + 0x98761234;
   }
 
-  public void close() throws IOException {
+  public void close() {
     // noop, let GC do the cleanup.
   }
 


[5/6] cassandra git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by be...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
	CHANGES.txt
	src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
	src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
	src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
	src/java/org/apache/cassandra/io/util/SequentialWriter.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/55750e07
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/55750e07
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/55750e07

Branch: refs/heads/cassandra-2.1
Commit: 55750e07d20b76bf2c9f07575bfcb9193734bf24
Parents: edf48f8 3679b1b
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Jan 7 13:05:16 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Jan 7 13:05:16 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 ++
 .../io/compress/CompressedSequentialWriter.java |  6 +++
 .../io/compress/CompressionMetadata.java        |  9 +++++
 .../cassandra/io/sstable/SSTableWriter.java     | 21 ++++++-----
 .../io/util/ChecksummedSequentialWriter.java    |  6 +++
 .../cassandra/io/util/SequentialWriter.java     | 39 ++++++++++++++++----
 6 files changed, 67 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 1c1bfe2,7aad4c0..372972d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,51 -1,5 +1,54 @@@
 +2.1.3
 + * (cqlsh) Handle a schema mismatch being detected on startup (CASSANDRA-8512)
 + * Properly calculate expected write size during compaction (CASSANDRA-8532)
 + * Invalidate affected prepared statements when a table's columns
 +   are altered (CASSANDRA-7910)
 + * Stress - user defined writes should populate sequentally (CASSANDRA-8524)
 + * Fix regression in SSTableRewriter causing some rows to become unreadable 
 +   during compaction (CASSANDRA-8429)
 + * Run major compactions for repaired/unrepaired in parallel (CASSANDRA-8510)
 + * (cqlsh) Fix compression options in DESCRIBE TABLE output when compression
 +   is disabled (CASSANDRA-8288)
 + * (cqlsh) Fix DESCRIBE output after keyspaces are altered (CASSANDRA-7623)
 + * Make sure we set lastCompactedKey correctly (CASSANDRA-8463)
 + * (cqlsh) Fix output of CONSISTENCY command (CASSANDRA-8507)
 + * (cqlsh) Fixed the handling of LIST statements (CASSANDRA-8370)
 + * Make sstablescrub check leveled manifest again (CASSANDRA-8432)
 + * Check first/last keys in sstable when giving out positions (CASSANDRA-8458)
 + * Disable mmap on Windows (CASSANDRA-6993)
 + * Add missing ConsistencyLevels to cassandra-stress (CASSANDRA-8253)
 + * Add auth support to cassandra-stress (CASSANDRA-7985)
 + * Fix ArrayIndexOutOfBoundsException when generating error message
 +   for some CQL syntax errors (CASSANDRA-8455)
 + * Scale memtable slab allocation logarithmically (CASSANDRA-7882)
 + * cassandra-stress simultaneous inserts over same seed (CASSANDRA-7964)
 + * Reduce cassandra-stress sampling memory requirements (CASSANDRA-7926)
 + * Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383)
 + * Make read "defrag" async to reclaim memtables (CASSANDRA-8459)
 + * Remove tmplink files for offline compactions (CASSANDRA-8321)
 + * Reduce maxHintsInProgress (CASSANDRA-8415)
 + * BTree updates may call provided update function twice (CASSANDRA-8018)
 + * Release sstable references after anticompaction (CASSANDRA-8386)
 + * Handle abort() in SSTableRewriter properly (CASSANDRA-8320)
 + * Fix high size calculations for prepared statements (CASSANDRA-8231)
 + * Centralize shared executors (CASSANDRA-8055)
 + * Fix filtering for CONTAINS (KEY) relations on frozen collection
 +   clustering columns when the query is restricted to a single
 +   partition (CASSANDRA-8203)
 + * Do more aggressive entire-sstable TTL expiry checks (CASSANDRA-8243)
 + * Add more log info if readMeter is null (CASSANDRA-8238)
 + * add check of the system wall clock time at startup (CASSANDRA-8305)
 + * Support for frozen collections (CASSANDRA-7859)
 + * Fix overflow on histogram computation (CASSANDRA-8028)
 + * Have paxos reuse the timestamp generation of normal queries (CASSANDRA-7801)
 + * Fix incremental repair not remove parent session on remote (CASSANDRA-8291)
 + * Improve JBOD disk utilization (CASSANDRA-7386)
 + * Log failed host when preparing incremental repair (CASSANDRA-8228)
 + * Force config client mode in CQLSSTableWriter (CASSANDRA-8281)
 +Merged from 2.0:
++=======
+ 2.0.12:
+  * Ensure SSTableWriter cleans up properly after failure (CASSANDRA-8499)
   * Increase bf true positive count on key cache hit (CASSANDRA-8525)
   * Move MeteredFlusher to its own thread (CASSANDRA-8485)
   * Fix non-distinct results in DISTNCT queries on static columns when

http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index d3c41fa,909d822..81bb3e9
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@@ -261,12 -271,12 +261,18 @@@ public class CompressedSequentialWrite
          }
      }
  
+     public void abort()
+     {
+         super.abort();
+         metadataWriter.abort();
+     }
+ 
 +    @Override
 +    public void writeFullChecksum(Descriptor descriptor)
 +    {
 +        crcMetadata.writeFullChecksum(descriptor);
 +    }
 +
      /**
       * Class to hold a mark to the position of the file
       */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 173722f,5b0154b..f19d502
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@@ -356,25 -360,38 +356,34 @@@ public class CompressionMetadat
           */
          public void resetAndTruncate(int chunkIndex)
          {
 +            count = chunkIndex;
 +        }
 +
 +        public void close(long dataLength, int chunks) throws IOException
 +        {
 +            DataOutputStream out = null;
              try
              {
 -                seek(dataLengthOffset
 -                     + 8 // size reserved for uncompressed data length
 -                     + 4 // size reserved for chunk count
 -                     + (chunkIndex * 8L));
 -                getChannel().truncate(getFilePointer());
 +            	out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filePath)));
 +	            assert chunks == count;
 +	            writeHeader(out, dataLength, chunks);
 +                for (int i = 0 ; i < count ; i++)
 +                    out.writeLong(offsets.getLong(i * 8));
              }
 -            catch (IOException e)
 +            finally
              {
 -                throw new FSWriteError(e, filePath);
 +                FileUtils.closeQuietly(out);
              }
          }
+ 
 -        public void close() throws IOException
 -        {
 -            if (getChannel().isOpen()) // if RAF.closed were public we could just use that, but it's not
 -                getChannel().force(true);
 -            super.close();
 -        }
 -
+         public void abort()
+         {
 -            try
 -            {
 -                super.close();
 -            }
 -            catch (Throwable t)
++            if (offsets != null)
+             {
 -                logger.warn("Suppressed exception while closing CompressionMetadata.Writer for {}", filePath, t);
++                offsets.unreference();
++                offsets = null;
+             }
+         }
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 595012d,08e5527..b0365ad
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@@ -339,23 -313,9 +339,19 @@@ public class SSTableWriter extends SSTa
       */
      public void abort()
      {
 -        assert descriptor.temporary;
 -        iwriter.abort();
 -        dataFile.abort();
 +        abort(true);
 +    }
 +    public void abort(boolean closeBf)
 +    {
 +        assert descriptor.type.isTemporary;
 +        if (iwriter == null && dataFile == null)
 +            return;
++
 +        if (iwriter != null)
-         {
-             FileUtils.closeQuietly(iwriter.indexFile);
-             if (closeBf)
-             {
-                 iwriter.bf.close();
-             }
-         }
++            iwriter.abort(closeBf);
++
 +        if (dataFile!= null)
-             FileUtils.closeQuietly(dataFile);
++            dataFile.abort();
  
          Set<Component> components = SSTable.componentsFor(descriptor);
          try
@@@ -589,7 -431,7 +585,7 @@@
      /**
       * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.
       */
--    class IndexWriter implements Closeable
++    class IndexWriter
      {
          private final SequentialWriter indexFile;
          public final SegmentedFile.Builder builder;
@@@ -633,6 -469,6 +629,13 @@@
              builder.addPotentialBoundary(indexPosition);
          }
  
++        public void abort(boolean closeBf)
++        {
++            indexFile.abort();
++            if (closeBf)
++                bf.close();
++        }
++
          /**
           * Closes the index and bloomfilter, making the public state of this writer valid for consumption.
           */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
index b95bf32,0000000..f4281b2
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
@@@ -1,53 -1,0 +1,59 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.util;
 +
 +import java.io.File;
 +
 +import org.apache.cassandra.io.sstable.Descriptor;
 +
 +public class ChecksummedSequentialWriter extends SequentialWriter
 +{
 +    private final SequentialWriter crcWriter;
 +    private final DataIntegrityMetadata.ChecksumWriter crcMetadata;
 +
 +    public ChecksummedSequentialWriter(File file, int bufferSize, File crcPath)
 +    {
 +        super(file, bufferSize);
 +        crcWriter = new SequentialWriter(crcPath, 8 * 1024);
 +        crcMetadata = new DataIntegrityMetadata.ChecksumWriter(crcWriter.stream);
 +        crcMetadata.writeChunkSize(buffer.length);
 +    }
 +
 +    protected void flushData()
 +    {
 +        super.flushData();
 +        crcMetadata.append(buffer, 0, validBufferBytes);
 +    }
 +
 +    public void writeFullChecksum(Descriptor descriptor)
 +    {
 +        crcMetadata.writeFullChecksum(descriptor);
 +    }
 +
 +    public void close()
 +    {
 +        super.close();
 +        crcWriter.close();
 +    }
++
++    public void abort()
++    {
++        super.abort();
++        crcWriter.abort();
++    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 7a7eb63,b980cf1..227c79d
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@@ -18,10 -18,11 +18,13 @@@
  package org.apache.cassandra.io.util;
  
  import java.io.*;
 +import java.nio.ByteBuffer;
  import java.nio.channels.ClosedChannelException;
 +import java.nio.channels.WritableByteChannel;
  
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.io.FSReadError;
  import org.apache.cassandra.io.FSWriteError;
@@@ -36,17 -32,23 +39,19 @@@ import org.apache.cassandra.utils.CLibr
   * Adds buffering, mark, and fsyncing to OutputStream.  We always fsync on close; we may also
   * fsync incrementally if Config.trickle_fsync is enabled.
   */
 -public class SequentialWriter extends OutputStream
 +public class SequentialWriter extends OutputStream implements WritableByteChannel
  {
+     private static final Logger logger = LoggerFactory.getLogger(SequentialWriter.class);
+ 
      // isDirty - true if this.buffer contains any un-synced bytes
      protected boolean isDirty = false, syncNeeded = false;
  
      // absolute path to the given file
      private final String filePath;
  
 -    // so we can use the write(int) path w/o tons of new byte[] allocations
 -    private final byte[] singleByteBuffer = new byte[1];
 -
      protected byte[] buffer;
 -    private final boolean skipIOCache;
      private final int fd;
--    private final int directoryFD;
++    private int directoryFD;
      // directory should be synced only after first file sync, in other words, only once per file
      private boolean directorySynced = false;
  
@@@ -437,21 -387,47 +442,39 @@@
  
          buffer = null;
  
-         try
-         {
-             out.close();
-         }
-         catch (IOException e)
 -        if (skipIOCache && bytesSinceCacheFlush > 0)
 -            CLibrary.trySkipCache(fd, 0, 0);
 -
+         cleanup(true);
+     }
+ 
+     public void abort()
+     {
+         cleanup(false);
+     }
+ 
+     private void cleanup(boolean throwExceptions)
+     {
 -        FileUtils.closeQuietly(metadata);
 -
 -        try { CLibrary.tryCloseFD(directoryFD); }
 -        catch (Throwable t) { handle(t, throwExceptions); }
++        if (directoryFD >= 0)
 +        {
-             throw new FSWriteError(e, getPath());
++            try { CLibrary.tryCloseFD(directoryFD); }
++            catch (Throwable t) { handle(t, throwExceptions); }
++            directoryFD = -1;
 +        }
  
-         CLibrary.tryCloseFD(directoryFD);
++        // close is idempotent
+         try { out.close(); }
+         catch (Throwable t) { handle(t, throwExceptions); }
+     }
+ 
+     private void handle(Throwable t, boolean throwExceptions)
+     {
+         if (!throwExceptions)
+             logger.warn("Suppressing exception thrown while aborting writer", t);
+         else
+             throw new FSWriteError(t, getPath());
      }
  
 -    /**
 -     * Turn on digest computation on this writer.
 -     * This can only be called before any data is written to this write,
 -     * otherwise an IllegalStateException is thrown.
 -     */
 -    public void setDataIntegrityWriter(DataIntegrityMetadata.ChecksumWriter writer)
 +    // hack to make life easier for subclasses
 +    public void writeFullChecksum(Descriptor descriptor)
      {
 -        if (current != 0)
 -            throw new IllegalStateException();
 -        metadata = writer;
 -        metadata.writeChunkSize(buffer.length);
      }
  
      /**


[3/6] cassandra git commit: Ensure SSTableWriter cleans up properly after failure

Posted by be...@apache.org.
Ensure SSTableWriter cleans up properly after failure

patch by benedict; reviewed by marcuse


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3679b1bd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3679b1bd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3679b1bd

Branch: refs/heads/trunk
Commit: 3679b1bd270fcfbbae4c8ac9eb5114cc98fbe4de
Parents: e27cdf9
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Jan 7 12:54:48 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Jan 7 12:54:48 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../io/compress/CompressedSequentialWriter.java |  6 ++++
 .../io/compress/CompressionMetadata.java        | 17 +++++++++
 .../cassandra/io/sstable/SSTableWriter.java     | 11 ++++--
 .../cassandra/io/util/SequentialWriter.java     | 37 +++++++++++++++-----
 .../cassandra/utils/AlwaysPresentFilter.java    |  2 +-
 .../org/apache/cassandra/utils/BloomFilter.java |  2 +-
 .../org/apache/cassandra/utils/IFilter.java     |  2 ++
 .../org/apache/cassandra/utils/obs/IBitSet.java |  2 ++
 .../cassandra/utils/obs/OffHeapBitSet.java      |  2 +-
 .../apache/cassandra/utils/obs/OpenBitSet.java  |  2 +-
 11 files changed, 69 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c8cf1d4..7aad4c0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.12:
+ * Ensure SSTableWriter cleans up properly after failure (CASSANDRA-8499)
  * Increase bf true positive count on key cache hit (CASSANDRA-8525)
  * Move MeteredFlusher to its own thread (CASSANDRA-8485)
  * Fix non-distinct results in DISTNCT queries on static columns when

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index eef5b17..909d822 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -271,6 +271,12 @@ public class CompressedSequentialWriter extends SequentialWriter
         }
     }
 
+    public void abort()
+    {
+        super.abort();
+        metadataWriter.abort();
+    }
+
     /**
      * Class to hold a mark to the position of the file
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 231778a..5b0154b 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -23,6 +23,9 @@ import java.util.*;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Longs;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.FSReadError;
@@ -40,6 +43,8 @@ import org.apache.cassandra.utils.Pair;
  */
 public class CompressionMetadata
 {
+    private static final Logger logger = LoggerFactory.getLogger(CompressionMetadata.class);
+
     public final long dataLength;
     public final long compressedFileLength;
     public final boolean hasPostCompressionAdlerChecksums;
@@ -375,6 +380,18 @@ public class CompressionMetadata
                 getChannel().force(true);
             super.close();
         }
+
+        public void abort()
+        {
+            try
+            {
+                super.close();
+            }
+            catch (Throwable t)
+            {
+                logger.warn("Suppressed exception while closing CompressionMetadata.Writer for {}", filePath, t);
+            }
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index afa066d..08e5527 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -314,8 +314,8 @@ public class SSTableWriter extends SSTable
     public void abort()
     {
         assert descriptor.temporary;
-        FileUtils.closeQuietly(iwriter);
-        FileUtils.closeQuietly(dataFile);
+        iwriter.abort();
+        dataFile.abort();
 
         Set<Component> components = SSTable.componentsFor(descriptor);
         try
@@ -391,6 +391,7 @@ public class SSTableWriter extends SSTable
         }
         catch (IOException e)
         {
+            out.abort();
             throw new FSWriteError(e, out.getPath());
         }
         out.close();
@@ -498,6 +499,12 @@ public class SSTableWriter extends SSTable
             FileUtils.truncate(indexFile.getPath(), position);
         }
 
+        public void abort()
+        {
+            indexFile.abort();
+            bf.close();
+        }
+
         public void mark()
         {
             mark = indexFile.mark();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index dc95676..b980cf1 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -20,6 +20,9 @@ package org.apache.cassandra.io.util;
 import java.io.*;
 import java.nio.channels.ClosedChannelException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.FSWriteError;
@@ -31,6 +34,8 @@ import org.apache.cassandra.utils.CLibrary;
  */
 public class SequentialWriter extends OutputStream
 {
+    private static final Logger logger = LoggerFactory.getLogger(SequentialWriter.class);
+
     // isDirty - true if this.buffer contains any un-synced bytes
     protected boolean isDirty = false, syncNeeded = false;
 
@@ -385,17 +390,31 @@ public class SequentialWriter extends OutputStream
         if (skipIOCache && bytesSinceCacheFlush > 0)
             CLibrary.trySkipCache(fd, 0, 0);
 
-        try
-        {
-            out.close();
-        }
-        catch (IOException e)
-        {
-            throw new FSWriteError(e, getPath());
-        }
+        cleanup(true);
+    }
 
+    public void abort()
+    {
+        cleanup(false);
+    }
+
+    private void cleanup(boolean throwExceptions)
+    {
         FileUtils.closeQuietly(metadata);
-        CLibrary.tryCloseFD(directoryFD);
+
+        try { CLibrary.tryCloseFD(directoryFD); }
+        catch (Throwable t) { handle(t, throwExceptions); }
+
+        try { out.close(); }
+        catch (Throwable t) { handle(t, throwExceptions); }
+    }
+
+    private void handle(Throwable t, boolean throwExceptions)
+    {
+        if (!throwExceptions)
+            logger.warn("Suppressing exception thrown while aborting writer", t);
+        else
+            throw new FSWriteError(t, getPath());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
index 0886edc..2ba4400 100644
--- a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
+++ b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
@@ -31,7 +31,7 @@ public class AlwaysPresentFilter implements IFilter
 
     public void clear() { }
 
-    public void close() throws IOException { }
+    public void close() { }
 
     public long serializedSize() { return 0; }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/BloomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BloomFilter.java b/src/java/org/apache/cassandra/utils/BloomFilter.java
index 9fbb38e..e50a746 100644
--- a/src/java/org/apache/cassandra/utils/BloomFilter.java
+++ b/src/java/org/apache/cassandra/utils/BloomFilter.java
@@ -112,7 +112,7 @@ public abstract class BloomFilter implements IFilter
         bitset.clear();
     }
 
-    public void close() throws IOException
+    public void close()
     {
         bitset.close();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/IFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/IFilter.java b/src/java/org/apache/cassandra/utils/IFilter.java
index aed5f39..f0771c6 100644
--- a/src/java/org/apache/cassandra/utils/IFilter.java
+++ b/src/java/org/apache/cassandra/utils/IFilter.java
@@ -35,4 +35,6 @@ public interface IFilter extends Closeable
      * @return the amount of memory in bytes used off heap
      */
     long offHeapSize();
+
+    void close();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/obs/IBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/IBitSet.java b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
index 47ba492..42db722 100644
--- a/src/java/org/apache/cassandra/utils/obs/IBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
@@ -55,4 +55,6 @@ public interface IBitSet extends Closeable
      * @return the amount of memory in bytes used off heap
      */
     public long offHeapSize();
+
+    public void close();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
index 5063d80..7d47d14 100644
--- a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
@@ -145,7 +145,7 @@ public class OffHeapBitSet implements IBitSet
         return new OffHeapBitSet(memory);
     }
 
-    public void close() throws IOException
+    public void close()
     {
         bytes.free();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
index 3e1efce..b1abe08 100644
--- a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
@@ -393,7 +393,7 @@ public class OpenBitSet implements IBitSet
     return (int)((h>>32) ^ h) + 0x98761234;
   }
 
-  public void close() throws IOException {
+  public void close() {
     // noop, let GC do the cleanup.
   }
 


[4/6] cassandra git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by be...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
	CHANGES.txt
	src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
	src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
	src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
	src/java/org/apache/cassandra/io/util/SequentialWriter.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/55750e07
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/55750e07
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/55750e07

Branch: refs/heads/trunk
Commit: 55750e07d20b76bf2c9f07575bfcb9193734bf24
Parents: edf48f8 3679b1b
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Jan 7 13:05:16 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Jan 7 13:05:16 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 ++
 .../io/compress/CompressedSequentialWriter.java |  6 +++
 .../io/compress/CompressionMetadata.java        |  9 +++++
 .../cassandra/io/sstable/SSTableWriter.java     | 21 ++++++-----
 .../io/util/ChecksummedSequentialWriter.java    |  6 +++
 .../cassandra/io/util/SequentialWriter.java     | 39 ++++++++++++++++----
 6 files changed, 67 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 1c1bfe2,7aad4c0..372972d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,51 -1,5 +1,54 @@@
 +2.1.3
 + * (cqlsh) Handle a schema mismatch being detected on startup (CASSANDRA-8512)
 + * Properly calculate expected write size during compaction (CASSANDRA-8532)
 + * Invalidate affected prepared statements when a table's columns
 +   are altered (CASSANDRA-7910)
 + * Stress - user defined writes should populate sequentally (CASSANDRA-8524)
 + * Fix regression in SSTableRewriter causing some rows to become unreadable 
 +   during compaction (CASSANDRA-8429)
 + * Run major compactions for repaired/unrepaired in parallel (CASSANDRA-8510)
 + * (cqlsh) Fix compression options in DESCRIBE TABLE output when compression
 +   is disabled (CASSANDRA-8288)
 + * (cqlsh) Fix DESCRIBE output after keyspaces are altered (CASSANDRA-7623)
 + * Make sure we set lastCompactedKey correctly (CASSANDRA-8463)
 + * (cqlsh) Fix output of CONSISTENCY command (CASSANDRA-8507)
 + * (cqlsh) Fixed the handling of LIST statements (CASSANDRA-8370)
 + * Make sstablescrub check leveled manifest again (CASSANDRA-8432)
 + * Check first/last keys in sstable when giving out positions (CASSANDRA-8458)
 + * Disable mmap on Windows (CASSANDRA-6993)
 + * Add missing ConsistencyLevels to cassandra-stress (CASSANDRA-8253)
 + * Add auth support to cassandra-stress (CASSANDRA-7985)
 + * Fix ArrayIndexOutOfBoundsException when generating error message
 +   for some CQL syntax errors (CASSANDRA-8455)
 + * Scale memtable slab allocation logarithmically (CASSANDRA-7882)
 + * cassandra-stress simultaneous inserts over same seed (CASSANDRA-7964)
 + * Reduce cassandra-stress sampling memory requirements (CASSANDRA-7926)
 + * Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383)
 + * Make read "defrag" async to reclaim memtables (CASSANDRA-8459)
 + * Remove tmplink files for offline compactions (CASSANDRA-8321)
 + * Reduce maxHintsInProgress (CASSANDRA-8415)
 + * BTree updates may call provided update function twice (CASSANDRA-8018)
 + * Release sstable references after anticompaction (CASSANDRA-8386)
 + * Handle abort() in SSTableRewriter properly (CASSANDRA-8320)
 + * Fix high size calculations for prepared statements (CASSANDRA-8231)
 + * Centralize shared executors (CASSANDRA-8055)
 + * Fix filtering for CONTAINS (KEY) relations on frozen collection
 +   clustering columns when the query is restricted to a single
 +   partition (CASSANDRA-8203)
 + * Do more aggressive entire-sstable TTL expiry checks (CASSANDRA-8243)
 + * Add more log info if readMeter is null (CASSANDRA-8238)
 + * add check of the system wall clock time at startup (CASSANDRA-8305)
 + * Support for frozen collections (CASSANDRA-7859)
 + * Fix overflow on histogram computation (CASSANDRA-8028)
 + * Have paxos reuse the timestamp generation of normal queries (CASSANDRA-7801)
 + * Fix incremental repair not remove parent session on remote (CASSANDRA-8291)
 + * Improve JBOD disk utilization (CASSANDRA-7386)
 + * Log failed host when preparing incremental repair (CASSANDRA-8228)
 + * Force config client mode in CQLSSTableWriter (CASSANDRA-8281)
 +Merged from 2.0:
++=======
+ 2.0.12:
+  * Ensure SSTableWriter cleans up properly after failure (CASSANDRA-8499)
   * Increase bf true positive count on key cache hit (CASSANDRA-8525)
   * Move MeteredFlusher to its own thread (CASSANDRA-8485)
   * Fix non-distinct results in DISTNCT queries on static columns when

http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index d3c41fa,909d822..81bb3e9
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@@ -261,12 -271,12 +261,18 @@@ public class CompressedSequentialWrite
          }
      }
  
+     public void abort()
+     {
+         super.abort();
+         metadataWriter.abort();
+     }
+ 
 +    @Override
 +    public void writeFullChecksum(Descriptor descriptor)
 +    {
 +        crcMetadata.writeFullChecksum(descriptor);
 +    }
 +
      /**
       * Class to hold a mark to the position of the file
       */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 173722f,5b0154b..f19d502
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@@ -356,25 -360,38 +356,34 @@@ public class CompressionMetadat
           */
          public void resetAndTruncate(int chunkIndex)
          {
 +            count = chunkIndex;
 +        }
 +
 +        public void close(long dataLength, int chunks) throws IOException
 +        {
 +            DataOutputStream out = null;
              try
              {
 -                seek(dataLengthOffset
 -                     + 8 // size reserved for uncompressed data length
 -                     + 4 // size reserved for chunk count
 -                     + (chunkIndex * 8L));
 -                getChannel().truncate(getFilePointer());
 +            	out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filePath)));
 +	            assert chunks == count;
 +	            writeHeader(out, dataLength, chunks);
 +                for (int i = 0 ; i < count ; i++)
 +                    out.writeLong(offsets.getLong(i * 8));
              }
 -            catch (IOException e)
 +            finally
              {
 -                throw new FSWriteError(e, filePath);
 +                FileUtils.closeQuietly(out);
              }
          }
+ 
 -        public void close() throws IOException
 -        {
 -            if (getChannel().isOpen()) // if RAF.closed were public we could just use that, but it's not
 -                getChannel().force(true);
 -            super.close();
 -        }
 -
+         public void abort()
+         {
 -            try
 -            {
 -                super.close();
 -            }
 -            catch (Throwable t)
++            if (offsets != null)
+             {
 -                logger.warn("Suppressed exception while closing CompressionMetadata.Writer for {}", filePath, t);
++                offsets.unreference();
++                offsets = null;
+             }
+         }
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 595012d,08e5527..b0365ad
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@@ -339,23 -313,9 +339,19 @@@ public class SSTableWriter extends SSTa
       */
      public void abort()
      {
 -        assert descriptor.temporary;
 -        iwriter.abort();
 -        dataFile.abort();
 +        abort(true);
 +    }
 +    public void abort(boolean closeBf)
 +    {
 +        assert descriptor.type.isTemporary;
 +        if (iwriter == null && dataFile == null)
 +            return;
++
 +        if (iwriter != null)
-         {
-             FileUtils.closeQuietly(iwriter.indexFile);
-             if (closeBf)
-             {
-                 iwriter.bf.close();
-             }
-         }
++            iwriter.abort(closeBf);
++
 +        if (dataFile!= null)
-             FileUtils.closeQuietly(dataFile);
++            dataFile.abort();
  
          Set<Component> components = SSTable.componentsFor(descriptor);
          try
@@@ -589,7 -431,7 +585,7 @@@
      /**
       * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.
       */
--    class IndexWriter implements Closeable
++    class IndexWriter
      {
          private final SequentialWriter indexFile;
          public final SegmentedFile.Builder builder;
@@@ -633,6 -469,6 +629,13 @@@
              builder.addPotentialBoundary(indexPosition);
          }
  
++        public void abort(boolean closeBf)
++        {
++            indexFile.abort();
++            if (closeBf)
++                bf.close();
++        }
++
          /**
           * Closes the index and bloomfilter, making the public state of this writer valid for consumption.
           */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
index b95bf32,0000000..f4281b2
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
@@@ -1,53 -1,0 +1,59 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.util;
 +
 +import java.io.File;
 +
 +import org.apache.cassandra.io.sstable.Descriptor;
 +
 +public class ChecksummedSequentialWriter extends SequentialWriter
 +{
 +    private final SequentialWriter crcWriter;
 +    private final DataIntegrityMetadata.ChecksumWriter crcMetadata;
 +
 +    public ChecksummedSequentialWriter(File file, int bufferSize, File crcPath)
 +    {
 +        super(file, bufferSize);
 +        crcWriter = new SequentialWriter(crcPath, 8 * 1024);
 +        crcMetadata = new DataIntegrityMetadata.ChecksumWriter(crcWriter.stream);
 +        crcMetadata.writeChunkSize(buffer.length);
 +    }
 +
 +    protected void flushData()
 +    {
 +        super.flushData();
 +        crcMetadata.append(buffer, 0, validBufferBytes);
 +    }
 +
 +    public void writeFullChecksum(Descriptor descriptor)
 +    {
 +        crcMetadata.writeFullChecksum(descriptor);
 +    }
 +
 +    public void close()
 +    {
 +        super.close();
 +        crcWriter.close();
 +    }
++
++    public void abort()
++    {
++        super.abort();
++        crcWriter.abort();
++    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 7a7eb63,b980cf1..227c79d
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@@ -18,10 -18,11 +18,13 @@@
  package org.apache.cassandra.io.util;
  
  import java.io.*;
 +import java.nio.ByteBuffer;
  import java.nio.channels.ClosedChannelException;
 +import java.nio.channels.WritableByteChannel;
  
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.io.FSReadError;
  import org.apache.cassandra.io.FSWriteError;
@@@ -36,17 -32,23 +39,19 @@@ import org.apache.cassandra.utils.CLibr
   * Adds buffering, mark, and fsyncing to OutputStream.  We always fsync on close; we may also
   * fsync incrementally if Config.trickle_fsync is enabled.
   */
 -public class SequentialWriter extends OutputStream
 +public class SequentialWriter extends OutputStream implements WritableByteChannel
  {
+     private static final Logger logger = LoggerFactory.getLogger(SequentialWriter.class);
+ 
      // isDirty - true if this.buffer contains any un-synced bytes
      protected boolean isDirty = false, syncNeeded = false;
  
      // absolute path to the given file
      private final String filePath;
  
 -    // so we can use the write(int) path w/o tons of new byte[] allocations
 -    private final byte[] singleByteBuffer = new byte[1];
 -
      protected byte[] buffer;
 -    private final boolean skipIOCache;
      private final int fd;
--    private final int directoryFD;
++    private int directoryFD;
      // directory should be synced only after first file sync, in other words, only once per file
      private boolean directorySynced = false;
  
@@@ -437,21 -387,47 +442,39 @@@
  
          buffer = null;
  
-         try
-         {
-             out.close();
-         }
-         catch (IOException e)
 -        if (skipIOCache && bytesSinceCacheFlush > 0)
 -            CLibrary.trySkipCache(fd, 0, 0);
 -
+         cleanup(true);
+     }
+ 
+     public void abort()
+     {
+         cleanup(false);
+     }
+ 
+     private void cleanup(boolean throwExceptions)
+     {
 -        FileUtils.closeQuietly(metadata);
 -
 -        try { CLibrary.tryCloseFD(directoryFD); }
 -        catch (Throwable t) { handle(t, throwExceptions); }
++        if (directoryFD >= 0)
 +        {
-             throw new FSWriteError(e, getPath());
++            try { CLibrary.tryCloseFD(directoryFD); }
++            catch (Throwable t) { handle(t, throwExceptions); }
++            directoryFD = -1;
 +        }
  
-         CLibrary.tryCloseFD(directoryFD);
++        // close is idempotent
+         try { out.close(); }
+         catch (Throwable t) { handle(t, throwExceptions); }
+     }
+ 
+     private void handle(Throwable t, boolean throwExceptions)
+     {
+         if (!throwExceptions)
+             logger.warn("Suppressing exception thrown while aborting writer", t);
+         else
+             throw new FSWriteError(t, getPath());
      }
  
 -    /**
 -     * Turn on digest computation on this writer.
 -     * This can only be called before any data is written to this write,
 -     * otherwise an IllegalStateException is thrown.
 -     */
 -    public void setDataIntegrityWriter(DataIntegrityMetadata.ChecksumWriter writer)
 +    // hack to make life easier for subclasses
 +    public void writeFullChecksum(Descriptor descriptor)
      {
 -        if (current != 0)
 -            throw new IllegalStateException();
 -        metadata = writer;
 -        metadata.writeChunkSize(buffer.length);
      }
  
      /**