You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/07/30 22:40:58 UTC
[1/2] git commit: clean up SequentialWriter and friends patch by
Aleksey Yeschenko; reviewed by jbellis for CASSANDRA-2116
Updated Branches:
refs/heads/trunk 652ae9a64 -> 9ecda7230
clean up SequentialWriter and friends
patch by Aleksey Yeschenko; reviewed by jbellis for CASSANDRA-2116
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9ecda723
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9ecda723
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9ecda723
Branch: refs/heads/trunk
Commit: 9ecda7230c719784a203741d5d548b505cae2969
Parents: 844b9c4
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Jul 30 15:40:43 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Jul 30 15:40:43 2012 -0500
----------------------------------------------------------------------
.../cassandra/db/compaction/CompactionTask.java | 2 +-
.../db/compaction/LeveledCompactionTask.java | 3 +-
.../io/compress/CompressedSequentialWriter.java | 132 +++++++++++----
.../cassandra/io/compress/DeflateCompressor.java | 2 +-
.../apache/cassandra/io/sstable/SSTableWriter.java | 47 ++---
.../apache/cassandra/io/util/SequentialWriter.java | 95 ++++++++---
6 files changed, 189 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ecda723/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 796d56b..7a88c68 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -234,7 +234,7 @@ public class CompactionTask extends AbstractCompactionTask
}
//extensibility point for other strategies that may want to limit the upper bounds of the sstable segment size
- protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer) throws IOException
+ protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer)
{
return false;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ecda723/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
index ebc91d7..74e8401 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db.compaction;
-import java.io.IOException;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@@ -62,7 +61,7 @@ public class LeveledCompactionTask extends CompactionTask
}
@Override
- protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer) throws IOException
+ protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer)
{
return writer.getOnDiskFilePointer() > sstableSizeInMB * 1024L * 1024L;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ecda723/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index 6d78287..00eb5a7 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -17,11 +17,15 @@
*/
package org.apache.cassandra.io.compress;
+import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.SSTableMetadata.Collector;
import org.apache.cassandra.io.util.FileMark;
import org.apache.cassandra.io.util.SequentialWriter;
@@ -77,30 +81,46 @@ public class CompressedSequentialWriter extends SequentialWriter
}
@Override
- public long getOnDiskFilePointer() throws IOException
+ public long getOnDiskFilePointer()
{
- return out.getFilePointer();
+ try
+ {
+ return out.getFilePointer();
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(e, getPath());
+ }
}
@Override
- public void sync() throws IOException
+ public void sync()
{
throw new UnsupportedOperationException();
}
@Override
- public void flush() throws IOException
+ public void flush()
{
throw new UnsupportedOperationException();
}
@Override
- protected void flushData() throws IOException
+ protected void flushData()
{
seekToChunkStart();
- // compressing data with buffer re-use
- int compressedLength = compressor.compress(buffer, 0, validBufferBytes, compressed, 0);
+
+ int compressedLength;
+ try
+ {
+ // compressing data with buffer re-use
+ compressedLength = compressor.compress(buffer, 0, validBufferBytes, compressed, 0);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Compression exception", e); // shouldn't happen
+ }
originalSize += validBufferBytes;
compressedSize += compressedLength;
@@ -108,16 +128,23 @@ public class CompressedSequentialWriter extends SequentialWriter
// update checksum
checksum.update(buffer, 0, validBufferBytes);
- // write an offset of the newly written chunk to the index file
- metadataWriter.writeLong(chunkOffset);
- chunkCount++;
+ try
+ {
+ // write an offset of the newly written chunk to the index file
+ metadataWriter.writeLong(chunkOffset);
+ chunkCount++;
- assert compressedLength <= compressed.buffer.length;
+ assert compressedLength <= compressed.buffer.length;
- // write data itself
- out.write(compressed.buffer, 0, compressedLength);
- // write corresponding checksum
- out.writeInt((int) checksum.getValue());
+ // write data itself
+ out.write(compressed.buffer, 0, compressedLength);
+ // write corresponding checksum
+ out.writeInt((int) checksum.getValue());
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, getPath());
+ }
// reset checksum object to the blank state for re-use
checksum.reset();
@@ -133,11 +160,11 @@ public class CompressedSequentialWriter extends SequentialWriter
}
@Override
- public synchronized void resetAndTruncate(FileMark mark) throws IOException
+ public synchronized void resetAndTruncate(FileMark mark)
{
assert mark instanceof CompressedFileWriterMark;
- CompressedFileWriterMark realMark = ((CompressedFileWriterMark) mark);
+ CompressedFileWriterMark realMark = (CompressedFileWriterMark) mark;
// reset position
current = realMark.uncDataOffset;
@@ -161,16 +188,39 @@ public class CompressedSequentialWriter extends SequentialWriter
if (compressed.buffer.length < chunkSize)
compressed.buffer = new byte[chunkSize];
- out.seek(chunkOffset);
- out.readFully(compressed.buffer, 0, chunkSize);
-
- // decompress data chunk and store its length
- int validBytes = compressor.uncompress(compressed.buffer, 0, chunkSize, buffer, 0);
-
- checksum.update(buffer, 0, validBytes);
-
- if (out.readInt() != (int) checksum.getValue())
- throw new CorruptBlockException(getPath(), chunkOffset, chunkSize);
+ try
+ {
+ out.seek(chunkOffset);
+ out.readFully(compressed.buffer, 0, chunkSize);
+
+ int validBytes;
+ try
+ {
+ // decompress data chunk and store its length
+ validBytes = compressor.uncompress(compressed.buffer, 0, chunkSize, buffer, 0);
+ }
+ catch (IOException e)
+ {
+ throw new CorruptBlockException(getPath(), chunkOffset, chunkSize);
+ }
+
+ checksum.update(buffer, 0, validBytes);
+
+ if (out.readInt() != (int) checksum.getValue())
+ throw new CorruptBlockException(getPath(), chunkOffset, chunkSize);
+ }
+ catch (CorruptBlockException e)
+ {
+ throw new CorruptSSTableException(e, getPath());
+ }
+ catch (EOFException e)
+ {
+ throw new CorruptSSTableException(new CorruptBlockException(getPath(), chunkOffset, chunkSize), getPath());
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(e, getPath());
+ }
checksum.reset();
@@ -186,17 +236,24 @@ public class CompressedSequentialWriter extends SequentialWriter
/**
* Seek to the offset where next compressed data chunk should be stored.
- *
- * @throws IOException on any I/O error.
*/
- private void seekToChunkStart() throws IOException
+ private void seekToChunkStart()
{
- if (out.getFilePointer() != chunkOffset)
- out.seek(chunkOffset);
+ if (getOnDiskFilePointer() != chunkOffset)
+ {
+ try
+ {
+ out.seek(chunkOffset);
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(e, getPath());
+ }
+ }
}
@Override
- public void close() throws IOException
+ public void close()
{
if (buffer == null)
return; // already closed
@@ -204,7 +261,14 @@ public class CompressedSequentialWriter extends SequentialWriter
super.close();
sstableMetadataCollector.addCompressionRatio(compressedSize, originalSize);
metadataWriter.finalizeHeader(current, chunkCount);
- metadataWriter.close();
+ try
+ {
+ metadataWriter.close();
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, getPath());
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ecda723/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
index 97f2ccb..125a08f 100644
--- a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
@@ -68,7 +68,7 @@ public class DeflateCompressor implements ICompressor
return chunkLength;
}
- public int compress(byte[] input, int inputOffset, int inputLength, ICompressor.WrappedArray output, int outputOffset) throws IOException
+ public int compress(byte[] input, int inputOffset, int inputLength, ICompressor.WrappedArray output, int outputOffset)
{
Deflater def = deflater.get();
def.reset();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ecda723/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index e7128f4..2ea71d8 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.io.sstable;
import java.io.*;
+import java.nio.channels.ClosedChannelException;
import java.util.*;
import java.util.regex.Pattern;
@@ -114,8 +115,7 @@ public class SSTableWriter extends SSTable
iwriter.mark();
}
- // NOT necessarily an FS error - not throwing FSWE.
- public void resetAndTruncate() throws IOException
+ public void resetAndTruncate()
{
dataFile.resetAndTruncate(dataMark);
iwriter.resetAndTruncate();
@@ -149,21 +149,21 @@ public class SSTableWriter extends SSTable
public RowIndexEntry append(AbstractCompactedRow row)
{
+ long currentPosition = beforeAppend(row.key);
try
{
- long currentPosition = beforeAppend(row.key);
ByteBufferUtil.writeWithShortLength(row.key.key, dataFile.stream);
long dataStart = dataFile.getFilePointer();
long dataSize = row.write(dataFile.stream);
assert dataSize == dataFile.getFilePointer() - (dataStart + 8)
: "incorrect row data size " + dataSize + " written to " + dataFile.getPath() + "; correct is " + (dataFile.getFilePointer() - (dataStart + 8));
- sstableMetadataCollector.update(dataFile.getFilePointer() - currentPosition, row.columnStats());
- return afterAppend(row.key, currentPosition, row.deletionInfo(), row.index());
}
catch (IOException e)
{
throw new FSWriteError(e, dataFile.getPath());
}
+ sstableMetadataCollector.update(dataFile.getFilePointer() - currentPosition, row.columnStats());
+ return afterAppend(row.key, currentPosition, row.deletionInfo(), row.index());
}
public void append(DecoratedKey decoratedKey, ColumnFamily cf)
@@ -190,12 +190,12 @@ public class SSTableWriter extends SSTable
dataFile.stream.writeInt(builder.writtenAtomCount());
dataFile.stream.write(buffer.getData(), 0, buffer.getLength());
afterAppend(decoratedKey, startPosition, cf.deletionInfo(), index);
- sstableMetadataCollector.update(dataFile.getFilePointer() - startPosition, cf.getColumnStats());
}
catch (IOException e)
{
throw new FSWriteError(e, dataFile.getPath());
}
+ sstableMetadataCollector.update(dataFile.getFilePointer() - startPosition, cf.getColumnStats());
}
/**
@@ -318,17 +318,8 @@ public class SSTableWriter extends SSTable
{
// index and filter
iwriter.close();
-
- try
- {
- // main data, close will truncate if necessary
- dataFile.close();
- }
- catch (IOException e)
- {
- throw new FSWriteError(e, dataFile.getPath());
- }
-
+ // main data, close will truncate if necessary
+ dataFile.close();
// write sstable statistics
SSTableMetadata sstableMetadata = sstableMetadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName());
writeMetadata(descriptor, sstableMetadata);
@@ -373,12 +364,12 @@ public class SSTableWriter extends SSTable
try
{
out.write(String.format("%s %s", Hex.bytesToHex(digest), dataFileName).getBytes());
- out.close();
}
- catch (IOException e)
+ catch (ClosedChannelException e)
{
- throw new FSWriteError(e, out.getPath());
+ throw new AssertionError(); // can't happen.
}
+ out.close();
}
private static void writeMetadata(Descriptor desc, SSTableMetadata sstableMetadata)
@@ -387,12 +378,12 @@ public class SSTableWriter extends SSTable
try
{
SSTableMetadata.serializer.serialize(sstableMetadata, out.stream);
- out.close();
}
catch (IOException e)
{
throw new FSWriteError(e, out.getPath());
}
+ out.close();
}
static Descriptor rename(Descriptor tmpdesc, Set<Component> components)
@@ -421,7 +412,7 @@ public class SSTableWriter extends SSTable
return dataFile.getFilePointer();
}
- public long getOnDiskFilePointer() throws IOException
+ public long getOnDiskFilePointer()
{
return dataFile.getOnDiskFilePointer();
}
@@ -491,17 +482,17 @@ public class SSTableWriter extends SSTable
stream.flush();
fos.getFD().sync();
stream.close();
-
- // index
- long position = indexFile.getFilePointer();
- indexFile.close(); // calls force
- FileUtils.truncate(indexFile.getPath(), position);
}
catch (IOException e)
{
throw new FSWriteError(e, path);
}
+ // index
+ long position = indexFile.getFilePointer();
+ indexFile.close(); // calls force
+ FileUtils.truncate(indexFile.getPath(), position);
+
// finalize in-memory index state
summary.complete();
}
@@ -511,7 +502,7 @@ public class SSTableWriter extends SSTable
mark = indexFile.mark();
}
- public void resetAndTruncate() throws IOException
+ public void resetAndTruncate()
{
// we can't un-set the bloom filter addition, but extra keys in there are harmless.
// we can't reset dbuilder either, but that is the last thing called in afterappend so
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ecda723/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 3377226..77d4fcf 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -23,6 +23,8 @@ import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.utils.CLibrary;
public class SequentialWriter extends OutputStream
@@ -106,18 +108,18 @@ public class SequentialWriter extends OutputStream
return new SequentialWriter(file, bufferSize, skipIOCache);
}
- public void write(int value) throws IOException
+ public void write(int value) throws ClosedChannelException
{
singleByteBuffer[0] = (byte) value;
write(singleByteBuffer, 0, 1);
}
- public void write(byte[] buffer) throws IOException
+ public void write(byte[] buffer) throws ClosedChannelException
{
write(buffer, 0, buffer.length);
}
- public void write(byte[] data, int offset, int length) throws IOException
+ public void write(byte[] data, int offset, int length) throws ClosedChannelException
{
if (buffer == null)
throw new ClosedChannelException();
@@ -137,7 +139,7 @@ public class SequentialWriter extends OutputStream
* return the number of bytes written. caller is responsible for setting
* isDirty.
*/
- private int writeAtMost(byte[] data, int offset, int length) throws IOException
+ private int writeAtMost(byte[] data, int offset, int length)
{
if (current >= bufferOffset + buffer.length)
reBuffer();
@@ -162,19 +164,25 @@ public class SequentialWriter extends OutputStream
/**
* Synchronize file contents with disk.
- * @throws java.io.IOException on any I/O error.
*/
- public void sync() throws IOException
+ public void sync()
{
syncInternal();
}
- protected void syncDataOnlyInternal() throws IOException
+ protected void syncDataOnlyInternal()
{
- out.getFD().sync();
+ try
+ {
+ out.getFD().sync();
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, getPath());
+ }
}
- protected void syncInternal() throws IOException
+ protected void syncInternal()
{
if (syncNeeded)
{
@@ -195,16 +203,14 @@ public class SequentialWriter extends OutputStream
* If buffer is dirty, flush it's contents to the operating system. Does not imply fsync().
*
* Currently, for implementation reasons, this also invalidates the buffer.
- *
- * @throws java.io.IOException on any I/O error.
*/
@Override
- public void flush() throws IOException
+ public void flush()
{
flushInternal();
}
- protected void flushInternal() throws IOException
+ protected void flushInternal()
{
if (isDirty)
{
@@ -246,11 +252,19 @@ public class SequentialWriter extends OutputStream
/**
* Override this method instead of overriding flush()
- * @throws IOException on any I/O error.
+ * @throws FSWriteError on any I/O error.
*/
- protected void flushData() throws IOException
+ protected void flushData()
{
- out.write(buffer, 0, validBufferBytes);
+ try
+ {
+ out.write(buffer, 0, validBufferBytes);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, getPath());
+ }
+
if (digest != null)
digest.update(buffer, 0, validBufferBytes);
}
@@ -267,14 +281,21 @@ public class SequentialWriter extends OutputStream
* Furthermore, for compressed files, this value refers to compressed data, while the
* writer getFilePointer() refers to uncompressedFile
*/
- public long getOnDiskFilePointer() throws IOException
+ public long getOnDiskFilePointer()
{
return getFilePointer();
}
- public long length() throws IOException
+ public long length()
{
- return Math.max(Math.max(current, out.length()), bufferOffset + validBufferBytes);
+ try
+ {
+ return Math.max(Math.max(current, out.length()), bufferOffset + validBufferBytes);
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(e, getPath());
+ }
}
public String getPath()
@@ -282,7 +303,7 @@ public class SequentialWriter extends OutputStream
return filePath;
}
- protected void reBuffer() throws IOException
+ protected void reBuffer()
{
flushInternal();
resetBuffer();
@@ -304,7 +325,7 @@ public class SequentialWriter extends OutputStream
return new BufferedFileWriterMark(current);
}
- public void resetAndTruncate(FileMark mark) throws IOException
+ public void resetAndTruncate(FileMark mark)
{
assert mark instanceof BufferedFileWriterMark;
@@ -325,18 +346,32 @@ public class SequentialWriter extends OutputStream
truncate(current);
// reset channel position
- out.seek(current);
+ try
+ {
+ out.seek(current);
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(e, getPath());
+ }
resetBuffer();
}
- public void truncate(long toSize) throws IOException
+ public void truncate(long toSize)
{
- out.getChannel().truncate(toSize);
+ try
+ {
+ out.getChannel().truncate(toSize);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, getPath());
+ }
}
@Override
- public void close() throws IOException
+ public void close()
{
if (buffer == null)
return; // already closed
@@ -348,7 +383,15 @@ public class SequentialWriter extends OutputStream
if (skipIOCache && bytesSinceCacheFlush > 0)
CLibrary.trySkipCache(fd, 0, 0);
- out.close();
+ try
+ {
+ out.close();
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, getPath());
+ }
+
CLibrary.tryCloseFD(directoryFD);
}