You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/01/07 14:06:05 UTC
[1/6] cassandra git commit: Ensure SSTableWriter cleans up properly
after failure
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.0 e27cdf935 -> 3679b1bd2
refs/heads/cassandra-2.1 edf48f817 -> 55750e07d
refs/heads/trunk e1e28d0f0 -> 729ebe078
Ensure SSTableWriter cleans up properly after failure
patch by benedict; reviewed by marcuse
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3679b1bd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3679b1bd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3679b1bd
Branch: refs/heads/cassandra-2.0
Commit: 3679b1bd270fcfbbae4c8ac9eb5114cc98fbe4de
Parents: e27cdf9
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Jan 7 12:54:48 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Jan 7 12:54:48 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../io/compress/CompressedSequentialWriter.java | 6 ++++
.../io/compress/CompressionMetadata.java | 17 +++++++++
.../cassandra/io/sstable/SSTableWriter.java | 11 ++++--
.../cassandra/io/util/SequentialWriter.java | 37 +++++++++++++++-----
.../cassandra/utils/AlwaysPresentFilter.java | 2 +-
.../org/apache/cassandra/utils/BloomFilter.java | 2 +-
.../org/apache/cassandra/utils/IFilter.java | 2 ++
.../org/apache/cassandra/utils/obs/IBitSet.java | 2 ++
.../cassandra/utils/obs/OffHeapBitSet.java | 2 +-
.../apache/cassandra/utils/obs/OpenBitSet.java | 2 +-
11 files changed, 69 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c8cf1d4..7aad4c0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.12:
+ * Ensure SSTableWriter cleans up properly after failure (CASSANDRA-8499)
* Increase bf true positive count on key cache hit (CASSANDRA-8525)
* Move MeteredFlusher to its own thread (CASSANDRA-8485)
* Fix non-distinct results in DISTNCT queries on static columns when
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index eef5b17..909d822 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -271,6 +271,12 @@ public class CompressedSequentialWriter extends SequentialWriter
}
}
+ public void abort()
+ {
+ super.abort();
+ metadataWriter.abort();
+ }
+
/**
* Class to hold a mark to the position of the file
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 231778a..5b0154b 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -23,6 +23,9 @@ import java.util.*;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Longs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.FSReadError;
@@ -40,6 +43,8 @@ import org.apache.cassandra.utils.Pair;
*/
public class CompressionMetadata
{
+ private static final Logger logger = LoggerFactory.getLogger(CompressionMetadata.class);
+
public final long dataLength;
public final long compressedFileLength;
public final boolean hasPostCompressionAdlerChecksums;
@@ -375,6 +380,18 @@ public class CompressionMetadata
getChannel().force(true);
super.close();
}
+
+ public void abort()
+ {
+ try
+ {
+ super.close();
+ }
+ catch (Throwable t)
+ {
+ logger.warn("Suppressed exception while closing CompressionMetadata.Writer for {}", filePath, t);
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index afa066d..08e5527 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -314,8 +314,8 @@ public class SSTableWriter extends SSTable
public void abort()
{
assert descriptor.temporary;
- FileUtils.closeQuietly(iwriter);
- FileUtils.closeQuietly(dataFile);
+ iwriter.abort();
+ dataFile.abort();
Set<Component> components = SSTable.componentsFor(descriptor);
try
@@ -391,6 +391,7 @@ public class SSTableWriter extends SSTable
}
catch (IOException e)
{
+ out.abort();
throw new FSWriteError(e, out.getPath());
}
out.close();
@@ -498,6 +499,12 @@ public class SSTableWriter extends SSTable
FileUtils.truncate(indexFile.getPath(), position);
}
+ public void abort()
+ {
+ indexFile.abort();
+ bf.close();
+ }
+
public void mark()
{
mark = indexFile.mark();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index dc95676..b980cf1 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -20,6 +20,9 @@ package org.apache.cassandra.io.util;
import java.io.*;
import java.nio.channels.ClosedChannelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.FSWriteError;
@@ -31,6 +34,8 @@ import org.apache.cassandra.utils.CLibrary;
*/
public class SequentialWriter extends OutputStream
{
+ private static final Logger logger = LoggerFactory.getLogger(SequentialWriter.class);
+
// isDirty - true if this.buffer contains any un-synced bytes
protected boolean isDirty = false, syncNeeded = false;
@@ -385,17 +390,31 @@ public class SequentialWriter extends OutputStream
if (skipIOCache && bytesSinceCacheFlush > 0)
CLibrary.trySkipCache(fd, 0, 0);
- try
- {
- out.close();
- }
- catch (IOException e)
- {
- throw new FSWriteError(e, getPath());
- }
+ cleanup(true);
+ }
+ public void abort()
+ {
+ cleanup(false);
+ }
+
+ private void cleanup(boolean throwExceptions)
+ {
FileUtils.closeQuietly(metadata);
- CLibrary.tryCloseFD(directoryFD);
+
+ try { CLibrary.tryCloseFD(directoryFD); }
+ catch (Throwable t) { handle(t, throwExceptions); }
+
+ try { out.close(); }
+ catch (Throwable t) { handle(t, throwExceptions); }
+ }
+
+ private void handle(Throwable t, boolean throwExceptions)
+ {
+ if (!throwExceptions)
+ logger.warn("Suppressing exception thrown while aborting writer", t);
+ else
+ throw new FSWriteError(t, getPath());
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
index 0886edc..2ba4400 100644
--- a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
+++ b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
@@ -31,7 +31,7 @@ public class AlwaysPresentFilter implements IFilter
public void clear() { }
- public void close() throws IOException { }
+ public void close() { }
public long serializedSize() { return 0; }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/BloomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BloomFilter.java b/src/java/org/apache/cassandra/utils/BloomFilter.java
index 9fbb38e..e50a746 100644
--- a/src/java/org/apache/cassandra/utils/BloomFilter.java
+++ b/src/java/org/apache/cassandra/utils/BloomFilter.java
@@ -112,7 +112,7 @@ public abstract class BloomFilter implements IFilter
bitset.clear();
}
- public void close() throws IOException
+ public void close()
{
bitset.close();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/IFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/IFilter.java b/src/java/org/apache/cassandra/utils/IFilter.java
index aed5f39..f0771c6 100644
--- a/src/java/org/apache/cassandra/utils/IFilter.java
+++ b/src/java/org/apache/cassandra/utils/IFilter.java
@@ -35,4 +35,6 @@ public interface IFilter extends Closeable
* @return the amount of memory in bytes used off heap
*/
long offHeapSize();
+
+ void close();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/obs/IBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/IBitSet.java b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
index 47ba492..42db722 100644
--- a/src/java/org/apache/cassandra/utils/obs/IBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
@@ -55,4 +55,6 @@ public interface IBitSet extends Closeable
* @return the amount of memory in bytes used off heap
*/
public long offHeapSize();
+
+ public void close();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
index 5063d80..7d47d14 100644
--- a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
@@ -145,7 +145,7 @@ public class OffHeapBitSet implements IBitSet
return new OffHeapBitSet(memory);
}
- public void close() throws IOException
+ public void close()
{
bytes.free();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
index 3e1efce..b1abe08 100644
--- a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
@@ -393,7 +393,7 @@ public class OpenBitSet implements IBitSet
return (int)((h>>32) ^ h) + 0x98761234;
}
- public void close() throws IOException {
+ public void close() {
// noop, let GC do the cleanup.
}
[6/6] cassandra git commit: Merge branch 'cassandra-2.1' into trunk
Posted by be...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/729ebe07
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/729ebe07
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/729ebe07
Branch: refs/heads/trunk
Commit: 729ebe078a67fd1aa779b26a4738b76da23af04d
Parents: e1e28d0 55750e0
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Jan 7 13:05:33 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Jan 7 13:05:33 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 3 ++
.../io/compress/CompressedSequentialWriter.java | 6 +++
.../io/compress/CompressionMetadata.java | 9 +++++
.../io/sstable/format/big/BigTableWriter.java | 21 ++++++-----
.../io/util/ChecksummedSequentialWriter.java | 6 +++
.../cassandra/io/util/SequentialWriter.java | 39 ++++++++++++++++----
6 files changed, 67 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/729ebe07/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/729ebe07/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/729ebe07/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/729ebe07/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index c52184b,0000000..2d34209
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@@ -1,588 -1,0 +1,591 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.big;
+
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.FilterFactory;
+import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.StreamingHistogram;
+
+public class BigTableWriter extends SSTableWriter
+{
+ private static final Logger logger = LoggerFactory.getLogger(BigTableWriter.class);
+
+ // not very random, but the only value that can't be mistaken for a legal column-name length
+ public static final int END_OF_ROW = 0x0000;
+
+ private IndexWriter iwriter;
+ private SegmentedFile.Builder dbuilder;
+ private final SequentialWriter dataFile;
+ private DecoratedKey lastWrittenKey;
+ private FileMark dataMark;
+
+ BigTableWriter(Descriptor descriptor, Long keyCount, Long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector)
+ {
+ super(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector);
+
+ iwriter = new IndexWriter(keyCount);
+
+ if (compression)
+ {
+ dataFile = SequentialWriter.open(getFilename(),
+ descriptor.filenameFor(Component.COMPRESSION_INFO),
+ metadata.compressionParameters(),
+ metadataCollector);
+ dbuilder = SegmentedFile.getCompressedBuilder((CompressedSequentialWriter) dataFile);
+ }
+ else
+ {
+ dataFile = SequentialWriter.open(new File(getFilename()), new File(descriptor.filenameFor(Component.CRC)));
+ dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
+ }
+ }
+
+ public void mark()
+ {
+ dataMark = dataFile.mark();
+ iwriter.mark();
+ }
+
+ public void resetAndTruncate()
+ {
+ dataFile.resetAndTruncate(dataMark);
+ iwriter.resetAndTruncate();
+ }
+
+ /**
+ * Perform sanity checks on @param decoratedKey and @return the position in the data file before any data is written
+ */
+ private long beforeAppend(DecoratedKey decoratedKey)
+ {
+ assert decoratedKey != null : "Keys must not be null"; // empty keys ARE allowed b/c of indexed column values
+ if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey) >= 0)
+ throw new RuntimeException("Last written key " + lastWrittenKey + " >= current key " + decoratedKey + " writing into " + getFilename());
+ return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer();
+ }
+
+ private void afterAppend(DecoratedKey decoratedKey, long dataPosition, RowIndexEntry index)
+ {
+ metadataCollector.addKey(decoratedKey.getKey());
+ lastWrittenKey = decoratedKey;
+ last = lastWrittenKey;
+ if (first == null)
+ first = lastWrittenKey;
+
+ if (logger.isTraceEnabled())
+ logger.trace("wrote {} at {}", decoratedKey, dataPosition);
+ iwriter.append(decoratedKey, index);
+ dbuilder.addPotentialBoundary(dataPosition);
+ }
+
+ /**
+ * @param row
+ * @return null if the row was compacted away entirely; otherwise, the PK index entry for this row
+ */
+ public RowIndexEntry append(AbstractCompactedRow row)
+ {
+ long currentPosition = beforeAppend(row.key);
+ RowIndexEntry entry;
+ try
+ {
+ entry = row.write(currentPosition, dataFile);
+ if (entry == null)
+ return null;
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, dataFile.getPath());
+ }
+ metadataCollector.update(dataFile.getFilePointer() - currentPosition, row.columnStats());
+ afterAppend(row.key, currentPosition, entry);
+ return entry;
+ }
+
+ public void append(DecoratedKey decoratedKey, ColumnFamily cf)
+ {
+ if (decoratedKey.getKey().remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
+ {
+ logger.error("Key size {} exceeds maximum of {}, skipping row",
+ decoratedKey.getKey().remaining(),
+ FBUtilities.MAX_UNSIGNED_SHORT);
+ return;
+ }
+
+ long startPosition = beforeAppend(decoratedKey);
+ try
+ {
+ RowIndexEntry entry = rawAppend(cf, startPosition, decoratedKey, dataFile.stream);
+ afterAppend(decoratedKey, startPosition, entry);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, dataFile.getPath());
+ }
+ metadataCollector.update(dataFile.getFilePointer() - startPosition, cf.getColumnStats());
+ }
+
+ private static RowIndexEntry rawAppend(ColumnFamily cf, long startPosition, DecoratedKey key, DataOutputPlus out) throws IOException
+ {
+ assert cf.hasColumns() || cf.isMarkedForDelete();
+
+ ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, key.getKey(), out);
+ ColumnIndex index = builder.build(cf);
+
+ out.writeShort(END_OF_ROW);
+ return RowIndexEntry.create(startPosition, cf.deletionInfo().getTopLevelDeletion(), index);
+ }
+
+ /**
+ * @throws IOException if a read from the DataInput fails
+ * @throws FSWriteError if a write to the dataFile fails
+ */
+ public long appendFromStream(DecoratedKey key, CFMetaData metadata, DataInput in, Version version) throws IOException
+ {
+ long currentPosition = beforeAppend(key);
+
+ ColumnStats.MaxLongTracker maxTimestampTracker = new ColumnStats.MaxLongTracker(Long.MAX_VALUE);
+ ColumnStats.MinLongTracker minTimestampTracker = new ColumnStats.MinLongTracker(Long.MIN_VALUE);
+ ColumnStats.MaxIntTracker maxDeletionTimeTracker = new ColumnStats.MaxIntTracker(Integer.MAX_VALUE);
+ List<ByteBuffer> minColumnNames = Collections.emptyList();
+ List<ByteBuffer> maxColumnNames = Collections.emptyList();
+ StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE);
+ boolean hasLegacyCounterShards = false;
+
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata);
+ cf.delete(DeletionTime.serializer.deserialize(in));
+
+ ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.getKey(), dataFile.stream);
+
+ if (cf.deletionInfo().getTopLevelDeletion().localDeletionTime < Integer.MAX_VALUE)
+ {
+ tombstones.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
+ maxDeletionTimeTracker.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
+ minTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
+ maxTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
+ }
+
+ Iterator<RangeTombstone> rangeTombstoneIterator = cf.deletionInfo().rangeIterator();
+ while (rangeTombstoneIterator.hasNext())
+ {
+ RangeTombstone rangeTombstone = rangeTombstoneIterator.next();
+ tombstones.update(rangeTombstone.getLocalDeletionTime());
+ minTimestampTracker.update(rangeTombstone.timestamp());
+ maxTimestampTracker.update(rangeTombstone.timestamp());
+ maxDeletionTimeTracker.update(rangeTombstone.getLocalDeletionTime());
+ minColumnNames = ColumnNameHelper.minComponents(minColumnNames, rangeTombstone.min, metadata.comparator);
+ maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, rangeTombstone.max, metadata.comparator);
+ }
+
+ Iterator<OnDiskAtom> iter = AbstractCell.onDiskIterator(in, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, version, metadata.comparator);
+ try
+ {
+ while (iter.hasNext())
+ {
+ OnDiskAtom atom = iter.next();
+ if (atom == null)
+ break;
+
+ if (atom instanceof CounterCell)
+ {
+ atom = ((CounterCell) atom).markLocalToBeCleared();
+ hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) atom).hasLegacyShards();
+ }
+
+ int deletionTime = atom.getLocalDeletionTime();
+ if (deletionTime < Integer.MAX_VALUE)
+ tombstones.update(deletionTime);
+ minTimestampTracker.update(atom.timestamp());
+ maxTimestampTracker.update(atom.timestamp());
+ minColumnNames = ColumnNameHelper.minComponents(minColumnNames, atom.name(), metadata.comparator);
+ maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, atom.name(), metadata.comparator);
+ maxDeletionTimeTracker.update(atom.getLocalDeletionTime());
+
+ columnIndexer.add(atom); // This write the atom on disk too
+ }
+
+ columnIndexer.maybeWriteEmptyRowHeader();
+ dataFile.stream.writeShort(END_OF_ROW);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, dataFile.getPath());
+ }
+
+ metadataCollector.updateMinTimestamp(minTimestampTracker.get())
+ .updateMaxTimestamp(maxTimestampTracker.get())
+ .updateMaxLocalDeletionTime(maxDeletionTimeTracker.get())
+ .addRowSize(dataFile.getFilePointer() - currentPosition)
+ .addColumnCount(columnIndexer.writtenAtomCount())
+ .mergeTombstoneHistogram(tombstones)
+ .updateMinColumnNames(minColumnNames)
+ .updateMaxColumnNames(maxColumnNames)
+ .updateHasLegacyCounterShards(hasLegacyCounterShards);
+
+ afterAppend(key, currentPosition, RowIndexEntry.create(currentPosition, cf.deletionInfo().getTopLevelDeletion(), columnIndexer.build()));
+ return currentPosition;
+ }
+
+ /**
+ * After failure, attempt to close the index writer and data file before deleting all temp components for the sstable
+ */
+ public void abort(boolean closeBf)
+ {
+ assert descriptor.type.isTemporary;
+ if (iwriter == null && dataFile == null)
+ return;
++
+ if (iwriter != null)
- {
- FileUtils.closeQuietly(iwriter.indexFile);
- if (closeBf)
- {
- iwriter.bf.close();
- }
- }
++ iwriter.abort(closeBf);
++
+ if (dataFile!= null)
- FileUtils.closeQuietly(dataFile);
++ dataFile.abort();
+
+ Set<Component> components = SSTable.componentsFor(descriptor);
+ try
+ {
+ if (!components.isEmpty())
+ SSTable.delete(descriptor, components);
+ }
+ catch (FSWriteError e)
+ {
+ logger.error(String.format("Failed deleting temp components for %s", descriptor), e);
+ throw e;
+ }
+ }
+
+ // we use this method to ensure any managed data we may have retained references to during the write are no
+ // longer referenced, so that we do not need to enclose the expensive call to closeAndOpenReader() in a transaction
+ public void isolateReferences()
+ {
+ // currently we only maintain references to first/last/lastWrittenKey from the data provided; all other
+ // data retention is done through copying
+ first = getMinimalKey(first);
+ last = lastWrittenKey = getMinimalKey(last);
+ }
+
+ private Descriptor makeTmpLinks()
+ {
+ // create temp links if they don't already exist
+ Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK);
+ if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists())
+ {
+ FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new File(link.filenameFor(Component.PRIMARY_INDEX)));
+ FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new File(link.filenameFor(Component.DATA)));
+ }
+ return link;
+ }
+
+ public SSTableReader openEarly(long maxDataAge)
+ {
+ StatsMetadata sstableMetadata = (StatsMetadata) metadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
+ metadata.getBloomFilterFpChance(),
+ repairedAt).get(MetadataType.STATS);
+
+ // find the max (exclusive) readable key
+ DecoratedKey exclusiveUpperBoundOfReadableIndex = iwriter.getMaxReadableKey(0);
+ if (exclusiveUpperBoundOfReadableIndex == null)
+ return null;
+
+ Descriptor link = makeTmpLinks();
+ // open the reader early, giving it a FINAL descriptor type so that it is indistinguishable for other consumers
+ SegmentedFile ifile = iwriter.builder.complete(link.filenameFor(Component.PRIMARY_INDEX), FinishType.EARLY);
+ SegmentedFile dfile = dbuilder.complete(link.filenameFor(Component.DATA), FinishType.EARLY);
+ SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL),
+ components, metadata,
+ partitioner, ifile,
+ dfile, iwriter.summary.build(partitioner, exclusiveUpperBoundOfReadableIndex),
+ iwriter.bf, maxDataAge, sstableMetadata, SSTableReader.OpenReason.EARLY);
+
+ // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed)
+ sstable.first = getMinimalKey(first);
+ sstable.last = getMinimalKey(exclusiveUpperBoundOfReadableIndex);
+ DecoratedKey inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(1);
+ if (inclusiveUpperBoundOfReadableData == null)
+ {
+ // Prevent leaving tmplink files on disk
+ sstable.releaseReference();
+ return null;
+ }
+ int offset = 2;
+ while (true)
+ {
+ RowIndexEntry indexEntry = sstable.getPosition(inclusiveUpperBoundOfReadableData, SSTableReader.Operator.GT);
+ if (indexEntry != null && indexEntry.position <= dataFile.getLastFlushOffset())
+ break;
+ inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(offset++);
+ if (inclusiveUpperBoundOfReadableData == null)
+ {
+ sstable.releaseReference();
+ return null;
+ }
+ }
+ sstable.last = getMinimalKey(inclusiveUpperBoundOfReadableData);
+ return sstable;
+ }
+
+ public SSTableReader closeAndOpenReader()
+ {
+ return closeAndOpenReader(System.currentTimeMillis());
+ }
+
+ public SSTableReader closeAndOpenReader(long maxDataAge)
+ {
+ return finish(FinishType.NORMAL, maxDataAge, this.repairedAt);
+ }
+
+ public SSTableReader finish(FinishType finishType, long maxDataAge, long repairedAt)
+ {
+ Pair<Descriptor, StatsMetadata> p;
+
+ p = close(finishType, repairedAt);
+ Descriptor desc = p.left;
+ StatsMetadata metadata = p.right;
+
+ if (finishType == FinishType.EARLY)
+ desc = makeTmpLinks();
+
+ // finalize in-memory state for the reader
+ SegmentedFile ifile = iwriter.builder.complete(desc.filenameFor(Component.PRIMARY_INDEX), finishType);
+ SegmentedFile dfile = dbuilder.complete(desc.filenameFor(Component.DATA), finishType);
+ SSTableReader sstable = SSTableReader.internalOpen(desc.asType(Descriptor.Type.FINAL),
+ components,
+ this.metadata,
+ partitioner,
+ ifile,
+ dfile,
+ iwriter.summary.build(partitioner),
+ iwriter.bf,
+ maxDataAge,
+ metadata,
+ finishType.openReason);
+ sstable.first = getMinimalKey(first);
+ sstable.last = getMinimalKey(last);
+
+ switch (finishType)
+ {
+ case NORMAL: case FINISH_EARLY:
+ // try to save the summaries to disk
+ sstable.saveSummary(iwriter.builder, dbuilder);
+ iwriter = null;
+ dbuilder = null;
+ }
+ return sstable;
+ }
+
+ // Close the writer and return the descriptor to the new sstable and it's metadata
+ public Pair<Descriptor, StatsMetadata> close()
+ {
+ return close(FinishType.NORMAL, this.repairedAt);
+ }
+
+ private Pair<Descriptor, StatsMetadata> close(FinishType type, long repairedAt)
+ {
+ switch (type)
+ {
+ case EARLY: case NORMAL:
+ iwriter.close();
+ dataFile.close();
+ }
+
+ // write sstable statistics
+ Map<MetadataType, MetadataComponent> metadataComponents;
+ metadataComponents = metadataCollector
+ .finalizeMetadata(partitioner.getClass().getCanonicalName(),
+ metadata.getBloomFilterFpChance(),repairedAt);
+
+ // remove the 'tmp' marker from all components
+ Descriptor descriptor = this.descriptor;
+ switch (type)
+ {
+ case NORMAL: case FINISH_EARLY:
+ dataFile.writeFullChecksum(descriptor);
+ writeMetadata(descriptor, metadataComponents);
+ // save the table of components
+ SSTable.appendTOC(descriptor, components);
+ descriptor = rename(descriptor, components);
+ }
+
+ return Pair.create(descriptor, (StatsMetadata) metadataComponents.get(MetadataType.STATS));
+ }
+
+ private static void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> components)
+ {
+ SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS)));
+ try
+ {
+ desc.getMetadataSerializer().serialize(components, out.stream);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, out.getPath());
+ }
+ finally
+ {
+ out.close();
+ }
+ }
+
+ public long getFilePointer()
+ {
+ return dataFile.getFilePointer();
+ }
+
+ public long getOnDiskFilePointer()
+ {
+ return dataFile.getOnDiskFilePointer();
+ }
+
+ /**
+ * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.
+ */
- class IndexWriter implements Closeable
++ class IndexWriter
+ {
+ private final SequentialWriter indexFile;
+ public final SegmentedFile.Builder builder;
+ public final IndexSummaryBuilder summary;
+ public final IFilter bf;
+ private FileMark mark;
+
+ IndexWriter(long keyCount)
+ {
+ indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
+ builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
+ summary = new IndexSummaryBuilder(keyCount, metadata.getMinIndexInterval(), Downsampling.BASE_SAMPLING_LEVEL);
+ bf = FilterFactory.getFilter(keyCount, metadata.getBloomFilterFpChance(), true);
+ }
+
+ // finds the last (-offset) decorated key that can be guaranteed to occur fully in the flushed portion of the index file
+ DecoratedKey getMaxReadableKey(int offset)
+ {
+ long maxIndexLength = indexFile.getLastFlushOffset();
+ return summary.getMaxReadableKey(maxIndexLength, offset);
+ }
+
+ public void append(DecoratedKey key, RowIndexEntry indexEntry)
+ {
+ bf.add(key.getKey());
+ long indexPosition = indexFile.getFilePointer();
+ try
+ {
+ ByteBufferUtil.writeWithShortLength(key.getKey(), indexFile.stream);
+ rowIndexEntrySerializer.serialize(indexEntry, indexFile.stream);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, indexFile.getPath());
+ }
+
+ if (logger.isTraceEnabled())
+ logger.trace("wrote index entry: {} at {}", indexEntry, indexPosition);
+
+ summary.maybeAddEntry(key, indexPosition);
+ builder.addPotentialBoundary(indexPosition);
+ }
+
++ public void abort(boolean closeBf)
++ {
++ indexFile.abort();
++ if (closeBf)
++ bf.close();
++ }
++
+ /**
+ * Closes the index and bloomfilter, making the public state of this writer valid for consumption.
+ */
+ public void close()
+ {
+ if (components.contains(Component.FILTER))
+ {
+ String path = descriptor.filenameFor(Component.FILTER);
+ try
+ {
+ // bloom filter
+ FileOutputStream fos = new FileOutputStream(path);
+ DataOutputStreamAndChannel stream = new DataOutputStreamAndChannel(fos);
+ FilterFactory.serialize(bf, stream);
+ stream.flush();
+ fos.getFD().sync();
+ stream.close();
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, path);
+ }
+ }
+
+ // index
+ long position = indexFile.getFilePointer();
+ indexFile.close(); // calls force
+ FileUtils.truncate(indexFile.getPath(), position);
+ }
+
+ public void mark()
+ {
+ mark = indexFile.mark();
+ }
+
+ public void resetAndTruncate()
+ {
+ // we can't un-set the bloom filter addition, but extra keys in there are harmless.
+ // we can't reset dbuilder either, but that is the last thing called in afterappend so
+ // we assume that if that worked then we won't be trying to reset.
+ indexFile.resetAndTruncate(mark);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/729ebe07/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
[2/6] cassandra git commit: Ensure SSTableWriter cleans up properly
after failure
Posted by be...@apache.org.
Ensure SSTableWriter cleans up properly after failure
patch by benedict; reviewed by marcuse
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3679b1bd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3679b1bd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3679b1bd
Branch: refs/heads/cassandra-2.1
Commit: 3679b1bd270fcfbbae4c8ac9eb5114cc98fbe4de
Parents: e27cdf9
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Jan 7 12:54:48 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Jan 7 12:54:48 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../io/compress/CompressedSequentialWriter.java | 6 ++++
.../io/compress/CompressionMetadata.java | 17 +++++++++
.../cassandra/io/sstable/SSTableWriter.java | 11 ++++--
.../cassandra/io/util/SequentialWriter.java | 37 +++++++++++++++-----
.../cassandra/utils/AlwaysPresentFilter.java | 2 +-
.../org/apache/cassandra/utils/BloomFilter.java | 2 +-
.../org/apache/cassandra/utils/IFilter.java | 2 ++
.../org/apache/cassandra/utils/obs/IBitSet.java | 2 ++
.../cassandra/utils/obs/OffHeapBitSet.java | 2 +-
.../apache/cassandra/utils/obs/OpenBitSet.java | 2 +-
11 files changed, 69 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c8cf1d4..7aad4c0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.12:
+ * Ensure SSTableWriter cleans up properly after failure (CASSANDRA-8499)
* Increase bf true positive count on key cache hit (CASSANDRA-8525)
* Move MeteredFlusher to its own thread (CASSANDRA-8485)
* Fix non-distinct results in DISTNCT queries on static columns when
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index eef5b17..909d822 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -271,6 +271,12 @@ public class CompressedSequentialWriter extends SequentialWriter
}
}
+ public void abort()
+ {
+ super.abort();
+ metadataWriter.abort();
+ }
+
/**
* Class to hold a mark to the position of the file
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 231778a..5b0154b 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -23,6 +23,9 @@ import java.util.*;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Longs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.FSReadError;
@@ -40,6 +43,8 @@ import org.apache.cassandra.utils.Pair;
*/
public class CompressionMetadata
{
+ private static final Logger logger = LoggerFactory.getLogger(CompressionMetadata.class);
+
public final long dataLength;
public final long compressedFileLength;
public final boolean hasPostCompressionAdlerChecksums;
@@ -375,6 +380,18 @@ public class CompressionMetadata
getChannel().force(true);
super.close();
}
+
+ public void abort()
+ {
+ try
+ {
+ super.close();
+ }
+ catch (Throwable t)
+ {
+ logger.warn("Suppressed exception while closing CompressionMetadata.Writer for {}", filePath, t);
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index afa066d..08e5527 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -314,8 +314,8 @@ public class SSTableWriter extends SSTable
public void abort()
{
assert descriptor.temporary;
- FileUtils.closeQuietly(iwriter);
- FileUtils.closeQuietly(dataFile);
+ iwriter.abort();
+ dataFile.abort();
Set<Component> components = SSTable.componentsFor(descriptor);
try
@@ -391,6 +391,7 @@ public class SSTableWriter extends SSTable
}
catch (IOException e)
{
+ out.abort();
throw new FSWriteError(e, out.getPath());
}
out.close();
@@ -498,6 +499,12 @@ public class SSTableWriter extends SSTable
FileUtils.truncate(indexFile.getPath(), position);
}
+ public void abort()
+ {
+ indexFile.abort();
+ bf.close();
+ }
+
public void mark()
{
mark = indexFile.mark();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index dc95676..b980cf1 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -20,6 +20,9 @@ package org.apache.cassandra.io.util;
import java.io.*;
import java.nio.channels.ClosedChannelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.FSWriteError;
@@ -31,6 +34,8 @@ import org.apache.cassandra.utils.CLibrary;
*/
public class SequentialWriter extends OutputStream
{
+ private static final Logger logger = LoggerFactory.getLogger(SequentialWriter.class);
+
// isDirty - true if this.buffer contains any un-synced bytes
protected boolean isDirty = false, syncNeeded = false;
@@ -385,17 +390,31 @@ public class SequentialWriter extends OutputStream
if (skipIOCache && bytesSinceCacheFlush > 0)
CLibrary.trySkipCache(fd, 0, 0);
- try
- {
- out.close();
- }
- catch (IOException e)
- {
- throw new FSWriteError(e, getPath());
- }
+ cleanup(true);
+ }
+ public void abort()
+ {
+ cleanup(false);
+ }
+
+ private void cleanup(boolean throwExceptions)
+ {
FileUtils.closeQuietly(metadata);
- CLibrary.tryCloseFD(directoryFD);
+
+ try { CLibrary.tryCloseFD(directoryFD); }
+ catch (Throwable t) { handle(t, throwExceptions); }
+
+ try { out.close(); }
+ catch (Throwable t) { handle(t, throwExceptions); }
+ }
+
+ private void handle(Throwable t, boolean throwExceptions)
+ {
+ if (!throwExceptions)
+ logger.warn("Suppressing exception thrown while aborting writer", t);
+ else
+ throw new FSWriteError(t, getPath());
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
index 0886edc..2ba4400 100644
--- a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
+++ b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
@@ -31,7 +31,7 @@ public class AlwaysPresentFilter implements IFilter
public void clear() { }
- public void close() throws IOException { }
+ public void close() { }
public long serializedSize() { return 0; }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/BloomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BloomFilter.java b/src/java/org/apache/cassandra/utils/BloomFilter.java
index 9fbb38e..e50a746 100644
--- a/src/java/org/apache/cassandra/utils/BloomFilter.java
+++ b/src/java/org/apache/cassandra/utils/BloomFilter.java
@@ -112,7 +112,7 @@ public abstract class BloomFilter implements IFilter
bitset.clear();
}
- public void close() throws IOException
+ public void close()
{
bitset.close();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/IFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/IFilter.java b/src/java/org/apache/cassandra/utils/IFilter.java
index aed5f39..f0771c6 100644
--- a/src/java/org/apache/cassandra/utils/IFilter.java
+++ b/src/java/org/apache/cassandra/utils/IFilter.java
@@ -35,4 +35,6 @@ public interface IFilter extends Closeable
* @return the amount of memory in bytes used off heap
*/
long offHeapSize();
+
+ void close();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/obs/IBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/IBitSet.java b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
index 47ba492..42db722 100644
--- a/src/java/org/apache/cassandra/utils/obs/IBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
@@ -55,4 +55,6 @@ public interface IBitSet extends Closeable
* @return the amount of memory in bytes used off heap
*/
public long offHeapSize();
+
+ public void close();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
index 5063d80..7d47d14 100644
--- a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
@@ -145,7 +145,7 @@ public class OffHeapBitSet implements IBitSet
return new OffHeapBitSet(memory);
}
- public void close() throws IOException
+ public void close()
{
bytes.free();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
index 3e1efce..b1abe08 100644
--- a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
@@ -393,7 +393,7 @@ public class OpenBitSet implements IBitSet
return (int)((h>>32) ^ h) + 0x98761234;
}
- public void close() throws IOException {
+ public void close() {
// noop, let GC do the cleanup.
}
[5/6] cassandra git commit: Merge branch 'cassandra-2.0' into
cassandra-2.1
Posted by be...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1
Conflicts:
CHANGES.txt
src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
src/java/org/apache/cassandra/io/util/SequentialWriter.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/55750e07
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/55750e07
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/55750e07
Branch: refs/heads/cassandra-2.1
Commit: 55750e07d20b76bf2c9f07575bfcb9193734bf24
Parents: edf48f8 3679b1b
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Jan 7 13:05:16 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Jan 7 13:05:16 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 3 ++
.../io/compress/CompressedSequentialWriter.java | 6 +++
.../io/compress/CompressionMetadata.java | 9 +++++
.../cassandra/io/sstable/SSTableWriter.java | 21 ++++++-----
.../io/util/ChecksummedSequentialWriter.java | 6 +++
.../cassandra/io/util/SequentialWriter.java | 39 ++++++++++++++++----
6 files changed, 67 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 1c1bfe2,7aad4c0..372972d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,51 -1,5 +1,54 @@@
+2.1.3
+ * (cqlsh) Handle a schema mismatch being detected on startup (CASSANDRA-8512)
+ * Properly calculate expected write size during compaction (CASSANDRA-8532)
+ * Invalidate affected prepared statements when a table's columns
+ are altered (CASSANDRA-7910)
+ * Stress - user defined writes should populate sequentally (CASSANDRA-8524)
+ * Fix regression in SSTableRewriter causing some rows to become unreadable
+ during compaction (CASSANDRA-8429)
+ * Run major compactions for repaired/unrepaired in parallel (CASSANDRA-8510)
+ * (cqlsh) Fix compression options in DESCRIBE TABLE output when compression
+ is disabled (CASSANDRA-8288)
+ * (cqlsh) Fix DESCRIBE output after keyspaces are altered (CASSANDRA-7623)
+ * Make sure we set lastCompactedKey correctly (CASSANDRA-8463)
+ * (cqlsh) Fix output of CONSISTENCY command (CASSANDRA-8507)
+ * (cqlsh) Fixed the handling of LIST statements (CASSANDRA-8370)
+ * Make sstablescrub check leveled manifest again (CASSANDRA-8432)
+ * Check first/last keys in sstable when giving out positions (CASSANDRA-8458)
+ * Disable mmap on Windows (CASSANDRA-6993)
+ * Add missing ConsistencyLevels to cassandra-stress (CASSANDRA-8253)
+ * Add auth support to cassandra-stress (CASSANDRA-7985)
+ * Fix ArrayIndexOutOfBoundsException when generating error message
+ for some CQL syntax errors (CASSANDRA-8455)
+ * Scale memtable slab allocation logarithmically (CASSANDRA-7882)
+ * cassandra-stress simultaneous inserts over same seed (CASSANDRA-7964)
+ * Reduce cassandra-stress sampling memory requirements (CASSANDRA-7926)
+ * Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383)
+ * Make read "defrag" async to reclaim memtables (CASSANDRA-8459)
+ * Remove tmplink files for offline compactions (CASSANDRA-8321)
+ * Reduce maxHintsInProgress (CASSANDRA-8415)
+ * BTree updates may call provided update function twice (CASSANDRA-8018)
+ * Release sstable references after anticompaction (CASSANDRA-8386)
+ * Handle abort() in SSTableRewriter properly (CASSANDRA-8320)
+ * Fix high size calculations for prepared statements (CASSANDRA-8231)
+ * Centralize shared executors (CASSANDRA-8055)
+ * Fix filtering for CONTAINS (KEY) relations on frozen collection
+ clustering columns when the query is restricted to a single
+ partition (CASSANDRA-8203)
+ * Do more aggressive entire-sstable TTL expiry checks (CASSANDRA-8243)
+ * Add more log info if readMeter is null (CASSANDRA-8238)
+ * add check of the system wall clock time at startup (CASSANDRA-8305)
+ * Support for frozen collections (CASSANDRA-7859)
+ * Fix overflow on histogram computation (CASSANDRA-8028)
+ * Have paxos reuse the timestamp generation of normal queries (CASSANDRA-7801)
+ * Fix incremental repair not remove parent session on remote (CASSANDRA-8291)
+ * Improve JBOD disk utilization (CASSANDRA-7386)
+ * Log failed host when preparing incremental repair (CASSANDRA-8228)
+ * Force config client mode in CQLSSTableWriter (CASSANDRA-8281)
+Merged from 2.0:
++=======
+ 2.0.12:
+ * Ensure SSTableWriter cleans up properly after failure (CASSANDRA-8499)
* Increase bf true positive count on key cache hit (CASSANDRA-8525)
* Move MeteredFlusher to its own thread (CASSANDRA-8485)
* Fix non-distinct results in DISTNCT queries on static columns when
http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index d3c41fa,909d822..81bb3e9
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@@ -261,12 -271,12 +261,18 @@@ public class CompressedSequentialWrite
}
}
+ public void abort()
+ {
+ super.abort();
+ metadataWriter.abort();
+ }
+
+ @Override
+ public void writeFullChecksum(Descriptor descriptor)
+ {
+ crcMetadata.writeFullChecksum(descriptor);
+ }
+
/**
* Class to hold a mark to the position of the file
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 173722f,5b0154b..f19d502
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@@ -356,25 -360,38 +356,34 @@@ public class CompressionMetadat
*/
public void resetAndTruncate(int chunkIndex)
{
+ count = chunkIndex;
+ }
+
+ public void close(long dataLength, int chunks) throws IOException
+ {
+ DataOutputStream out = null;
try
{
- seek(dataLengthOffset
- + 8 // size reserved for uncompressed data length
- + 4 // size reserved for chunk count
- + (chunkIndex * 8L));
- getChannel().truncate(getFilePointer());
+ out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filePath)));
+ assert chunks == count;
+ writeHeader(out, dataLength, chunks);
+ for (int i = 0 ; i < count ; i++)
+ out.writeLong(offsets.getLong(i * 8));
}
- catch (IOException e)
+ finally
{
- throw new FSWriteError(e, filePath);
+ FileUtils.closeQuietly(out);
}
}
+
- public void close() throws IOException
- {
- if (getChannel().isOpen()) // if RAF.closed were public we could just use that, but it's not
- getChannel().force(true);
- super.close();
- }
-
+ public void abort()
+ {
- try
- {
- super.close();
- }
- catch (Throwable t)
++ if (offsets != null)
+ {
- logger.warn("Suppressed exception while closing CompressionMetadata.Writer for {}", filePath, t);
++ offsets.unreference();
++ offsets = null;
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 595012d,08e5527..b0365ad
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@@ -339,23 -313,9 +339,19 @@@ public class SSTableWriter extends SSTa
*/
public void abort()
{
- assert descriptor.temporary;
- iwriter.abort();
- dataFile.abort();
+ abort(true);
+ }
+ public void abort(boolean closeBf)
+ {
+ assert descriptor.type.isTemporary;
+ if (iwriter == null && dataFile == null)
+ return;
++
+ if (iwriter != null)
- {
- FileUtils.closeQuietly(iwriter.indexFile);
- if (closeBf)
- {
- iwriter.bf.close();
- }
- }
++ iwriter.abort(closeBf);
++
+ if (dataFile!= null)
- FileUtils.closeQuietly(dataFile);
++ dataFile.abort();
Set<Component> components = SSTable.componentsFor(descriptor);
try
@@@ -589,7 -431,7 +585,7 @@@
/**
* Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.
*/
-- class IndexWriter implements Closeable
++ class IndexWriter
{
private final SequentialWriter indexFile;
public final SegmentedFile.Builder builder;
@@@ -633,6 -469,6 +629,13 @@@
builder.addPotentialBoundary(indexPosition);
}
++ public void abort(boolean closeBf)
++ {
++ indexFile.abort();
++ if (closeBf)
++ bf.close();
++ }
++
/**
* Closes the index and bloomfilter, making the public state of this writer valid for consumption.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
index b95bf32,0000000..f4281b2
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
@@@ -1,53 -1,0 +1,59 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.File;
+
+import org.apache.cassandra.io.sstable.Descriptor;
+
+public class ChecksummedSequentialWriter extends SequentialWriter
+{
+ private final SequentialWriter crcWriter;
+ private final DataIntegrityMetadata.ChecksumWriter crcMetadata;
+
+ public ChecksummedSequentialWriter(File file, int bufferSize, File crcPath)
+ {
+ super(file, bufferSize);
+ crcWriter = new SequentialWriter(crcPath, 8 * 1024);
+ crcMetadata = new DataIntegrityMetadata.ChecksumWriter(crcWriter.stream);
+ crcMetadata.writeChunkSize(buffer.length);
+ }
+
+ protected void flushData()
+ {
+ super.flushData();
+ crcMetadata.append(buffer, 0, validBufferBytes);
+ }
+
+ public void writeFullChecksum(Descriptor descriptor)
+ {
+ crcMetadata.writeFullChecksum(descriptor);
+ }
+
+ public void close()
+ {
+ super.close();
+ crcWriter.close();
+ }
++
++ public void abort()
++ {
++ super.abort();
++ crcWriter.abort();
++ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 7a7eb63,b980cf1..227c79d
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@@ -18,10 -18,11 +18,13 @@@
package org.apache.cassandra.io.util;
import java.io.*;
+import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
+import java.nio.channels.WritableByteChannel;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.FSWriteError;
@@@ -36,17 -32,23 +39,19 @@@ import org.apache.cassandra.utils.CLibr
* Adds buffering, mark, and fsyncing to OutputStream. We always fsync on close; we may also
* fsync incrementally if Config.trickle_fsync is enabled.
*/
-public class SequentialWriter extends OutputStream
+public class SequentialWriter extends OutputStream implements WritableByteChannel
{
+ private static final Logger logger = LoggerFactory.getLogger(SequentialWriter.class);
+
// isDirty - true if this.buffer contains any un-synced bytes
protected boolean isDirty = false, syncNeeded = false;
// absolute path to the given file
private final String filePath;
- // so we can use the write(int) path w/o tons of new byte[] allocations
- private final byte[] singleByteBuffer = new byte[1];
-
protected byte[] buffer;
- private final boolean skipIOCache;
private final int fd;
-- private final int directoryFD;
++ private int directoryFD;
// directory should be synced only after first file sync, in other words, only once per file
private boolean directorySynced = false;
@@@ -437,21 -387,47 +442,39 @@@
buffer = null;
- try
- {
- out.close();
- }
- catch (IOException e)
- if (skipIOCache && bytesSinceCacheFlush > 0)
- CLibrary.trySkipCache(fd, 0, 0);
-
+ cleanup(true);
+ }
+
+ public void abort()
+ {
+ cleanup(false);
+ }
+
+ private void cleanup(boolean throwExceptions)
+ {
- FileUtils.closeQuietly(metadata);
-
- try { CLibrary.tryCloseFD(directoryFD); }
- catch (Throwable t) { handle(t, throwExceptions); }
++ if (directoryFD >= 0)
+ {
- throw new FSWriteError(e, getPath());
++ try { CLibrary.tryCloseFD(directoryFD); }
++ catch (Throwable t) { handle(t, throwExceptions); }
++ directoryFD = -1;
+ }
- CLibrary.tryCloseFD(directoryFD);
++ // close is idempotent
+ try { out.close(); }
+ catch (Throwable t) { handle(t, throwExceptions); }
+ }
+
+ private void handle(Throwable t, boolean throwExceptions)
+ {
+ if (!throwExceptions)
+ logger.warn("Suppressing exception thrown while aborting writer", t);
+ else
+ throw new FSWriteError(t, getPath());
}
- /**
- * Turn on digest computation on this writer.
- * This can only be called before any data is written to this write,
- * otherwise an IllegalStateException is thrown.
- */
- public void setDataIntegrityWriter(DataIntegrityMetadata.ChecksumWriter writer)
+ // hack to make life easier for subclasses
+ public void writeFullChecksum(Descriptor descriptor)
{
- if (current != 0)
- throw new IllegalStateException();
- metadata = writer;
- metadata.writeChunkSize(buffer.length);
}
/**
[3/6] cassandra git commit: Ensure SSTableWriter cleans up properly
after failure
Posted by be...@apache.org.
Ensure SSTableWriter cleans up properly after failure
patch by benedict; reviewed by marcuse
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3679b1bd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3679b1bd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3679b1bd
Branch: refs/heads/trunk
Commit: 3679b1bd270fcfbbae4c8ac9eb5114cc98fbe4de
Parents: e27cdf9
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Jan 7 12:54:48 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Jan 7 12:54:48 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../io/compress/CompressedSequentialWriter.java | 6 ++++
.../io/compress/CompressionMetadata.java | 17 +++++++++
.../cassandra/io/sstable/SSTableWriter.java | 11 ++++--
.../cassandra/io/util/SequentialWriter.java | 37 +++++++++++++++-----
.../cassandra/utils/AlwaysPresentFilter.java | 2 +-
.../org/apache/cassandra/utils/BloomFilter.java | 2 +-
.../org/apache/cassandra/utils/IFilter.java | 2 ++
.../org/apache/cassandra/utils/obs/IBitSet.java | 2 ++
.../cassandra/utils/obs/OffHeapBitSet.java | 2 +-
.../apache/cassandra/utils/obs/OpenBitSet.java | 2 +-
11 files changed, 69 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c8cf1d4..7aad4c0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.12:
+ * Ensure SSTableWriter cleans up properly after failure (CASSANDRA-8499)
* Increase bf true positive count on key cache hit (CASSANDRA-8525)
* Move MeteredFlusher to its own thread (CASSANDRA-8485)
* Fix non-distinct results in DISTNCT queries on static columns when
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index eef5b17..909d822 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -271,6 +271,12 @@ public class CompressedSequentialWriter extends SequentialWriter
}
}
+ public void abort()
+ {
+ super.abort();
+ metadataWriter.abort();
+ }
+
/**
* Class to hold a mark to the position of the file
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 231778a..5b0154b 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -23,6 +23,9 @@ import java.util.*;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Longs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.FSReadError;
@@ -40,6 +43,8 @@ import org.apache.cassandra.utils.Pair;
*/
public class CompressionMetadata
{
+ private static final Logger logger = LoggerFactory.getLogger(CompressionMetadata.class);
+
public final long dataLength;
public final long compressedFileLength;
public final boolean hasPostCompressionAdlerChecksums;
@@ -375,6 +380,18 @@ public class CompressionMetadata
getChannel().force(true);
super.close();
}
+
+ public void abort()
+ {
+ try
+ {
+ super.close();
+ }
+ catch (Throwable t)
+ {
+ logger.warn("Suppressed exception while closing CompressionMetadata.Writer for {}", filePath, t);
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index afa066d..08e5527 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -314,8 +314,8 @@ public class SSTableWriter extends SSTable
public void abort()
{
assert descriptor.temporary;
- FileUtils.closeQuietly(iwriter);
- FileUtils.closeQuietly(dataFile);
+ iwriter.abort();
+ dataFile.abort();
Set<Component> components = SSTable.componentsFor(descriptor);
try
@@ -391,6 +391,7 @@ public class SSTableWriter extends SSTable
}
catch (IOException e)
{
+ out.abort();
throw new FSWriteError(e, out.getPath());
}
out.close();
@@ -498,6 +499,12 @@ public class SSTableWriter extends SSTable
FileUtils.truncate(indexFile.getPath(), position);
}
+ public void abort()
+ {
+ indexFile.abort();
+ bf.close();
+ }
+
public void mark()
{
mark = indexFile.mark();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index dc95676..b980cf1 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -20,6 +20,9 @@ package org.apache.cassandra.io.util;
import java.io.*;
import java.nio.channels.ClosedChannelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.FSWriteError;
@@ -31,6 +34,8 @@ import org.apache.cassandra.utils.CLibrary;
*/
public class SequentialWriter extends OutputStream
{
+ private static final Logger logger = LoggerFactory.getLogger(SequentialWriter.class);
+
// isDirty - true if this.buffer contains any un-synced bytes
protected boolean isDirty = false, syncNeeded = false;
@@ -385,17 +390,31 @@ public class SequentialWriter extends OutputStream
if (skipIOCache && bytesSinceCacheFlush > 0)
CLibrary.trySkipCache(fd, 0, 0);
- try
- {
- out.close();
- }
- catch (IOException e)
- {
- throw new FSWriteError(e, getPath());
- }
+ cleanup(true);
+ }
+ public void abort()
+ {
+ cleanup(false);
+ }
+
+ private void cleanup(boolean throwExceptions)
+ {
FileUtils.closeQuietly(metadata);
- CLibrary.tryCloseFD(directoryFD);
+
+ try { CLibrary.tryCloseFD(directoryFD); }
+ catch (Throwable t) { handle(t, throwExceptions); }
+
+ try { out.close(); }
+ catch (Throwable t) { handle(t, throwExceptions); }
+ }
+
+ private void handle(Throwable t, boolean throwExceptions)
+ {
+ if (!throwExceptions)
+ logger.warn("Suppressing exception thrown while aborting writer", t);
+ else
+ throw new FSWriteError(t, getPath());
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
index 0886edc..2ba4400 100644
--- a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
+++ b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
@@ -31,7 +31,7 @@ public class AlwaysPresentFilter implements IFilter
public void clear() { }
- public void close() throws IOException { }
+ public void close() { }
public long serializedSize() { return 0; }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/BloomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BloomFilter.java b/src/java/org/apache/cassandra/utils/BloomFilter.java
index 9fbb38e..e50a746 100644
--- a/src/java/org/apache/cassandra/utils/BloomFilter.java
+++ b/src/java/org/apache/cassandra/utils/BloomFilter.java
@@ -112,7 +112,7 @@ public abstract class BloomFilter implements IFilter
bitset.clear();
}
- public void close() throws IOException
+ public void close()
{
bitset.close();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/IFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/IFilter.java b/src/java/org/apache/cassandra/utils/IFilter.java
index aed5f39..f0771c6 100644
--- a/src/java/org/apache/cassandra/utils/IFilter.java
+++ b/src/java/org/apache/cassandra/utils/IFilter.java
@@ -35,4 +35,6 @@ public interface IFilter extends Closeable
* @return the amount of memory in bytes used off heap
*/
long offHeapSize();
+
+ void close();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/obs/IBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/IBitSet.java b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
index 47ba492..42db722 100644
--- a/src/java/org/apache/cassandra/utils/obs/IBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
@@ -55,4 +55,6 @@ public interface IBitSet extends Closeable
* @return the amount of memory in bytes used off heap
*/
public long offHeapSize();
+
+ public void close();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
index 5063d80..7d47d14 100644
--- a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
@@ -145,7 +145,7 @@ public class OffHeapBitSet implements IBitSet
return new OffHeapBitSet(memory);
}
- public void close() throws IOException
+ public void close()
{
bytes.free();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3679b1bd/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
index 3e1efce..b1abe08 100644
--- a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
@@ -393,7 +393,7 @@ public class OpenBitSet implements IBitSet
return (int)((h>>32) ^ h) + 0x98761234;
}
- public void close() throws IOException {
+ public void close() {
// noop, let GC do the cleanup.
}
[4/6] cassandra git commit: Merge branch 'cassandra-2.0' into
cassandra-2.1
Posted by be...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1
Conflicts:
CHANGES.txt
src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
src/java/org/apache/cassandra/io/util/SequentialWriter.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/55750e07
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/55750e07
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/55750e07
Branch: refs/heads/trunk
Commit: 55750e07d20b76bf2c9f07575bfcb9193734bf24
Parents: edf48f8 3679b1b
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Jan 7 13:05:16 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Jan 7 13:05:16 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 3 ++
.../io/compress/CompressedSequentialWriter.java | 6 +++
.../io/compress/CompressionMetadata.java | 9 +++++
.../cassandra/io/sstable/SSTableWriter.java | 21 ++++++-----
.../io/util/ChecksummedSequentialWriter.java | 6 +++
.../cassandra/io/util/SequentialWriter.java | 39 ++++++++++++++++----
6 files changed, 67 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 1c1bfe2,7aad4c0..372972d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,51 -1,5 +1,54 @@@
+2.1.3
+ * (cqlsh) Handle a schema mismatch being detected on startup (CASSANDRA-8512)
+ * Properly calculate expected write size during compaction (CASSANDRA-8532)
+ * Invalidate affected prepared statements when a table's columns
+ are altered (CASSANDRA-7910)
+ * Stress - user defined writes should populate sequentally (CASSANDRA-8524)
+ * Fix regression in SSTableRewriter causing some rows to become unreadable
+ during compaction (CASSANDRA-8429)
+ * Run major compactions for repaired/unrepaired in parallel (CASSANDRA-8510)
+ * (cqlsh) Fix compression options in DESCRIBE TABLE output when compression
+ is disabled (CASSANDRA-8288)
+ * (cqlsh) Fix DESCRIBE output after keyspaces are altered (CASSANDRA-7623)
+ * Make sure we set lastCompactedKey correctly (CASSANDRA-8463)
+ * (cqlsh) Fix output of CONSISTENCY command (CASSANDRA-8507)
+ * (cqlsh) Fixed the handling of LIST statements (CASSANDRA-8370)
+ * Make sstablescrub check leveled manifest again (CASSANDRA-8432)
+ * Check first/last keys in sstable when giving out positions (CASSANDRA-8458)
+ * Disable mmap on Windows (CASSANDRA-6993)
+ * Add missing ConsistencyLevels to cassandra-stress (CASSANDRA-8253)
+ * Add auth support to cassandra-stress (CASSANDRA-7985)
+ * Fix ArrayIndexOutOfBoundsException when generating error message
+ for some CQL syntax errors (CASSANDRA-8455)
+ * Scale memtable slab allocation logarithmically (CASSANDRA-7882)
+ * cassandra-stress simultaneous inserts over same seed (CASSANDRA-7964)
+ * Reduce cassandra-stress sampling memory requirements (CASSANDRA-7926)
+ * Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383)
+ * Make read "defrag" async to reclaim memtables (CASSANDRA-8459)
+ * Remove tmplink files for offline compactions (CASSANDRA-8321)
+ * Reduce maxHintsInProgress (CASSANDRA-8415)
+ * BTree updates may call provided update function twice (CASSANDRA-8018)
+ * Release sstable references after anticompaction (CASSANDRA-8386)
+ * Handle abort() in SSTableRewriter properly (CASSANDRA-8320)
+ * Fix high size calculations for prepared statements (CASSANDRA-8231)
+ * Centralize shared executors (CASSANDRA-8055)
+ * Fix filtering for CONTAINS (KEY) relations on frozen collection
+ clustering columns when the query is restricted to a single
+ partition (CASSANDRA-8203)
+ * Do more aggressive entire-sstable TTL expiry checks (CASSANDRA-8243)
+ * Add more log info if readMeter is null (CASSANDRA-8238)
+ * add check of the system wall clock time at startup (CASSANDRA-8305)
+ * Support for frozen collections (CASSANDRA-7859)
+ * Fix overflow on histogram computation (CASSANDRA-8028)
+ * Have paxos reuse the timestamp generation of normal queries (CASSANDRA-7801)
+ * Fix incremental repair not remove parent session on remote (CASSANDRA-8291)
+ * Improve JBOD disk utilization (CASSANDRA-7386)
+ * Log failed host when preparing incremental repair (CASSANDRA-8228)
+ * Force config client mode in CQLSSTableWriter (CASSANDRA-8281)
+Merged from 2.0:
++=======
+ 2.0.12:
+ * Ensure SSTableWriter cleans up properly after failure (CASSANDRA-8499)
* Increase bf true positive count on key cache hit (CASSANDRA-8525)
* Move MeteredFlusher to its own thread (CASSANDRA-8485)
* Fix non-distinct results in DISTNCT queries on static columns when
http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index d3c41fa,909d822..81bb3e9
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@@ -261,12 -271,12 +261,18 @@@ public class CompressedSequentialWrite
}
}
+ public void abort()
+ {
+ super.abort();
+ metadataWriter.abort();
+ }
+
+ @Override
+ public void writeFullChecksum(Descriptor descriptor)
+ {
+ crcMetadata.writeFullChecksum(descriptor);
+ }
+
/**
* Class to hold a mark to the position of the file
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 173722f,5b0154b..f19d502
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@@ -356,25 -360,38 +356,34 @@@ public class CompressionMetadat
*/
public void resetAndTruncate(int chunkIndex)
{
+ count = chunkIndex;
+ }
+
+ public void close(long dataLength, int chunks) throws IOException
+ {
+ DataOutputStream out = null;
try
{
- seek(dataLengthOffset
- + 8 // size reserved for uncompressed data length
- + 4 // size reserved for chunk count
- + (chunkIndex * 8L));
- getChannel().truncate(getFilePointer());
+ out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filePath)));
+ assert chunks == count;
+ writeHeader(out, dataLength, chunks);
+ for (int i = 0 ; i < count ; i++)
+ out.writeLong(offsets.getLong(i * 8));
}
- catch (IOException e)
+ finally
{
- throw new FSWriteError(e, filePath);
+ FileUtils.closeQuietly(out);
}
}
+
- public void close() throws IOException
- {
- if (getChannel().isOpen()) // if RAF.closed were public we could just use that, but it's not
- getChannel().force(true);
- super.close();
- }
-
+ public void abort()
+ {
- try
- {
- super.close();
- }
- catch (Throwable t)
++ if (offsets != null)
+ {
- logger.warn("Suppressed exception while closing CompressionMetadata.Writer for {}", filePath, t);
++ offsets.unreference();
++ offsets = null;
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 595012d,08e5527..b0365ad
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@@ -339,23 -313,9 +339,19 @@@ public class SSTableWriter extends SSTa
*/
public void abort()
{
- assert descriptor.temporary;
- iwriter.abort();
- dataFile.abort();
+ abort(true);
+ }
+ public void abort(boolean closeBf)
+ {
+ assert descriptor.type.isTemporary;
+ if (iwriter == null && dataFile == null)
+ return;
++
+ if (iwriter != null)
- {
- FileUtils.closeQuietly(iwriter.indexFile);
- if (closeBf)
- {
- iwriter.bf.close();
- }
- }
++ iwriter.abort(closeBf);
++
+ if (dataFile!= null)
- FileUtils.closeQuietly(dataFile);
++ dataFile.abort();
Set<Component> components = SSTable.componentsFor(descriptor);
try
@@@ -589,7 -431,7 +585,7 @@@
/**
* Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.
*/
-- class IndexWriter implements Closeable
++ class IndexWriter
{
private final SequentialWriter indexFile;
public final SegmentedFile.Builder builder;
@@@ -633,6 -469,6 +629,13 @@@
builder.addPotentialBoundary(indexPosition);
}
++ public void abort(boolean closeBf)
++ {
++ indexFile.abort();
++ if (closeBf)
++ bf.close();
++ }
++
/**
* Closes the index and bloomfilter, making the public state of this writer valid for consumption.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
index b95bf32,0000000..f4281b2
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
@@@ -1,53 -1,0 +1,59 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.File;
+
+import org.apache.cassandra.io.sstable.Descriptor;
+
+public class ChecksummedSequentialWriter extends SequentialWriter
+{
+ private final SequentialWriter crcWriter;
+ private final DataIntegrityMetadata.ChecksumWriter crcMetadata;
+
+ public ChecksummedSequentialWriter(File file, int bufferSize, File crcPath)
+ {
+ super(file, bufferSize);
+ crcWriter = new SequentialWriter(crcPath, 8 * 1024);
+ crcMetadata = new DataIntegrityMetadata.ChecksumWriter(crcWriter.stream);
+ crcMetadata.writeChunkSize(buffer.length);
+ }
+
+ protected void flushData()
+ {
+ super.flushData();
+ crcMetadata.append(buffer, 0, validBufferBytes);
+ }
+
+ public void writeFullChecksum(Descriptor descriptor)
+ {
+ crcMetadata.writeFullChecksum(descriptor);
+ }
+
+ public void close()
+ {
+ super.close();
+ crcWriter.close();
+ }
++
++ public void abort()
++ {
++ super.abort();
++ crcWriter.abort();
++ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 7a7eb63,b980cf1..227c79d
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@@ -18,10 -18,11 +18,13 @@@
package org.apache.cassandra.io.util;
import java.io.*;
+import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
+import java.nio.channels.WritableByteChannel;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.FSWriteError;
@@@ -36,17 -32,23 +39,19 @@@ import org.apache.cassandra.utils.CLibr
* Adds buffering, mark, and fsyncing to OutputStream. We always fsync on close; we may also
* fsync incrementally if Config.trickle_fsync is enabled.
*/
-public class SequentialWriter extends OutputStream
+public class SequentialWriter extends OutputStream implements WritableByteChannel
{
+ private static final Logger logger = LoggerFactory.getLogger(SequentialWriter.class);
+
// isDirty - true if this.buffer contains any un-synced bytes
protected boolean isDirty = false, syncNeeded = false;
// absolute path to the given file
private final String filePath;
- // so we can use the write(int) path w/o tons of new byte[] allocations
- private final byte[] singleByteBuffer = new byte[1];
-
protected byte[] buffer;
- private final boolean skipIOCache;
private final int fd;
-- private final int directoryFD;
++ private int directoryFD;
// directory should be synced only after first file sync, in other words, only once per file
private boolean directorySynced = false;
@@@ -437,21 -387,47 +442,39 @@@
buffer = null;
- try
- {
- out.close();
- }
- catch (IOException e)
- if (skipIOCache && bytesSinceCacheFlush > 0)
- CLibrary.trySkipCache(fd, 0, 0);
-
+ cleanup(true);
+ }
+
+ public void abort()
+ {
+ cleanup(false);
+ }
+
+ private void cleanup(boolean throwExceptions)
+ {
- FileUtils.closeQuietly(metadata);
-
- try { CLibrary.tryCloseFD(directoryFD); }
- catch (Throwable t) { handle(t, throwExceptions); }
++ if (directoryFD >= 0)
+ {
- throw new FSWriteError(e, getPath());
++ try { CLibrary.tryCloseFD(directoryFD); }
++ catch (Throwable t) { handle(t, throwExceptions); }
++ directoryFD = -1;
+ }
- CLibrary.tryCloseFD(directoryFD);
++ // close is idempotent
+ try { out.close(); }
+ catch (Throwable t) { handle(t, throwExceptions); }
+ }
+
+ private void handle(Throwable t, boolean throwExceptions)
+ {
+ if (!throwExceptions)
+ logger.warn("Suppressing exception thrown while aborting writer", t);
+ else
+ throw new FSWriteError(t, getPath());
}
- /**
- * Turn on digest computation on this writer.
- * This can only be called before any data is written to this write,
- * otherwise an IllegalStateException is thrown.
- */
- public void setDataIntegrityWriter(DataIntegrityMetadata.ChecksumWriter writer)
+ // hack to make life easier for subclasses
+ public void writeFullChecksum(Descriptor descriptor)
{
- if (current != 0)
- throw new IllegalStateException();
- metadata = writer;
- metadata.writeChunkSize(buffer.length);
}
/**