You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/08/17 10:32:20 UTC
[1/3] cassandra git commit: SequentialWriter extends BDOSP
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 136e7002f -> 29687a8bb
refs/heads/trunk 6ae27a320 -> ee4cb86ec
SequentialWriter extends BDOSP
patch by ariel; reviewed by benedict for CASSANDRA-9500
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/29687a8b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/29687a8b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/29687a8b
Branch: refs/heads/cassandra-3.0
Commit: 29687a8bb93a1df637e0ef32d3784e338afeecd9
Parents: 136e700
Author: Ariel Weisberg <ar...@weisberg.ws>
Authored: Wed Jul 22 18:34:55 2015 -0400
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Mon Aug 17 09:25:58 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ColumnIndex.java | 10 +-
.../io/compress/CompressedSequentialWriter.java | 33 +++-
.../io/sstable/format/big/BigTableWriter.java | 6 +-
.../io/util/BufferedDataOutputStreamPlus.java | 104 ++++++----
.../io/util/ChecksummedSequentialWriter.java | 3 +-
.../cassandra/io/util/SequentialWriter.java | 196 +++++--------------
.../org/apache/cassandra/utils/SyncUtil.java | 22 ++-
.../compaction/CompactionAwareWriterTest.java | 9 +-
.../CompressedSequentialWriterTest.java | 2 +-
.../io/util/BufferedDataOutputStreamTest.java | 62 +++++-
.../io/util/BufferedRandomAccessFileTest.java | 6 +-
.../compression/CompressedInputStreamTest.java | 2 +-
13 files changed, 240 insertions(+), 216 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3686a3d..b4b568c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -14,6 +14,7 @@
* Use byte to serialize MT hash length (CASSANDRA-9792)
* Replace usage of Adler32 with CRC32 (CASSANDRA-8684)
* Fix migration to new format from 2.1 SSTable (CASSANDRA-10006)
+ * SequentialWriter should extend BufferedDataOutputStreamPlus (CASSANDRA-9500)
Merged from 2.2:
* Fix histogram overflow exception (CASSANDRA-9973)
* Route gossip messages over dedicated socket (CASSANDRA-9237)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 52fc48f..9eef23e 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -94,10 +94,10 @@ public class ColumnIndex
private void writePartitionHeader(UnfilteredRowIterator iterator) throws IOException
{
- ByteBufferUtil.writeWithShortLength(iterator.partitionKey().getKey(), writer.stream);
- DeletionTime.serializer.serialize(iterator.partitionLevelDeletion(), writer.stream);
+ ByteBufferUtil.writeWithShortLength(iterator.partitionKey().getKey(), writer);
+ DeletionTime.serializer.serialize(iterator.partitionLevelDeletion(), writer);
if (header.hasStatic())
- UnfilteredSerializer.serializer.serialize(iterator.staticRow(), header, writer.stream, version);
+ UnfilteredSerializer.serializer.serialize(iterator.staticRow(), header, writer, version);
}
public ColumnIndex build() throws IOException
@@ -135,7 +135,7 @@ public class ColumnIndex
startPosition = currentPosition();
}
- UnfilteredSerializer.serializer.serialize(unfiltered, header, writer.stream, version);
+ UnfilteredSerializer.serializer.serialize(unfiltered, header, writer, version);
lastClustering = unfiltered.clustering();
++written;
@@ -153,7 +153,7 @@ public class ColumnIndex
private ColumnIndex close() throws IOException
{
- UnfilteredSerializer.serializer.writeEndOfPartition(writer.stream);
+ UnfilteredSerializer.serializer.writeEndOfPartition(writer);
// It's possible we add no rows, just a top level deletion
if (written == 0)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index a4afa3f..8e1ebff 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -17,6 +17,8 @@
*/
package org.apache.cassandra.io.compress;
+import static org.apache.cassandra.utils.Throwables.merge;
+
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
@@ -31,6 +33,7 @@ import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.DataIntegrityMetadata;
import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.schema.CompressionParams;
@@ -81,7 +84,7 @@ public class CompressedSequentialWriter extends SequentialWriter
{
try
{
- return channel.position();
+ return fchannel.position();
}
catch (IOException e)
{
@@ -130,9 +133,6 @@ public class CompressedSequentialWriter extends SequentialWriter
compressed.rewind();
crcMetadata.appendDirect(compressed, true);
lastFlushOffset += compressedLength + 4;
-
- // adjust our bufferOffset to account for the new uncompressed data we've now written out
- resetBuffer();
}
catch (IOException e)
{
@@ -155,6 +155,8 @@ public class CompressedSequentialWriter extends SequentialWriter
@Override
public FileMark mark()
{
+ if (!buffer.hasRemaining())
+ doFlush();
return new CompressedFileWriterMark(chunkOffset, current(), buffer.position(), chunkCount + 1);
}
@@ -189,8 +191,8 @@ public class CompressedSequentialWriter extends SequentialWriter
{
compressed.clear();
compressed.limit(chunkSize);
- channel.position(chunkOffset);
- channel.read(compressed);
+ fchannel.position(chunkOffset);
+ fchannel.read(compressed);
try
{
@@ -209,7 +211,7 @@ public class CompressedSequentialWriter extends SequentialWriter
checksum.update(compressed);
crcCheckBuffer.clear();
- channel.read(crcCheckBuffer);
+ fchannel.read(crcCheckBuffer);
crcCheckBuffer.flip();
if (crcCheckBuffer.getInt() != (int) checksum.getValue())
throw new CorruptBlockException(getPath(), chunkOffset, chunkSize);
@@ -229,7 +231,6 @@ public class CompressedSequentialWriter extends SequentialWriter
// Mark as dirty so we can guarantee the newly buffered bytes won't be lost on a rebuffer
buffer.position(realMark.validBufferBytes);
- isDirty = true;
bufferOffset = truncateTarget - buffer.position();
chunkCount = realMark.nextChunkIndex - 1;
@@ -248,7 +249,7 @@ public class CompressedSequentialWriter extends SequentialWriter
{
try
{
- channel.position(chunkOffset);
+ fchannel.position(chunkOffset);
}
catch (IOException e)
{
@@ -281,6 +282,20 @@ public class CompressedSequentialWriter extends SequentialWriter
sstableMetadataCollector.addCompressionRatio(compressedSize, uncompressedSize);
metadataWriter.finalizeLength(current(), chunkCount).prepareToCommit();
}
+
+ @Override
+ protected Throwable doPreCleanup(Throwable accumulate)
+ {
+ accumulate = super.doPreCleanup(accumulate);
+ if (compressed != null)
+ {
+ try { FileUtils.clean(compressed); }
+ catch (Throwable t) { accumulate = merge(accumulate, t); }
+ compressed = null;
+ }
+
+ return accumulate;
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 2b60479..77bf3d6 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -342,7 +342,7 @@ public class BigTableWriter extends SSTableWriter
File file = new File(desc.filenameFor(Component.STATS));
try (SequentialWriter out = SequentialWriter.open(file))
{
- desc.getMetadataSerializer().serialize(components, out.stream);
+ desc.getMetadataSerializer().serialize(components, out);
out.setDescriptor(desc).finish();
}
catch (IOException e)
@@ -407,8 +407,8 @@ public class BigTableWriter extends SSTableWriter
long indexStart = indexFile.getFilePointer();
try
{
- ByteBufferUtil.writeWithShortLength(key.getKey(), indexFile.stream);
- rowIndexEntrySerializer.serialize(indexEntry, indexFile.stream);
+ ByteBufferUtil.writeWithShortLength(key.getKey(), indexFile);
+ rowIndexEntrySerializer.serialize(indexEntry, indexFile);
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
index b6f3231..9434219 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
@@ -27,6 +27,8 @@ import java.nio.channels.WritableByteChannel;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
+import net.nicoulaj.compilecommand.annotations.DontInline;
+
import org.apache.cassandra.config.Config;
import org.apache.cassandra.utils.memory.MemoryUtil;
import org.apache.cassandra.utils.vint.VIntCoding;
@@ -41,7 +43,16 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
{
private static final int DEFAULT_BUFFER_SIZE = Integer.getInteger(Config.PROPERTY_PREFIX + "nio_data_output_stream_plus_buffer_size", 1024 * 32);
- ByteBuffer buffer;
+ protected ByteBuffer buffer;
+
+ //Allow derived classes to specify writing to the channel
+ //directly shouldn't happen because they intercept via doFlush for things
+ //like compression or checksumming
+ //Another hack for this value is that it also indicates that flushing early
+ //should not occur, flushes aligned with buffer size are desired
+ //Unless... it's the last flush. Compression and checksum formats
+ //expect block (same as buffer size) alignment for everything except the last block
+ protected boolean strictFlushing = false;
public BufferedDataOutputStreamPlus(RandomAccessFile ras)
{
@@ -142,40 +153,53 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
else
{
assert toWrite.isDirect();
+ MemoryUtil.duplicateDirectByteBuffer(toWrite, hollowBuffer);
+
if (toWrite.remaining() > buffer.remaining())
{
- doFlush();
- MemoryUtil.duplicateDirectByteBuffer(toWrite, hollowBuffer);
- if (toWrite.remaining() > buffer.remaining())
+ if (strictFlushing)
{
- while (hollowBuffer.hasRemaining())
- channel.write(hollowBuffer);
+ writeExcessSlow();
}
else
{
- buffer.put(hollowBuffer);
+ doFlush();
+ while (hollowBuffer.remaining() > buffer.capacity())
+ channel.write(hollowBuffer);
}
}
- else
- {
- MemoryUtil.duplicateDirectByteBuffer(toWrite, hollowBuffer);
- buffer.put(hollowBuffer);
- }
+
+ buffer.put(hollowBuffer);
}
}
+ // writes anything we can't fit into the buffer
+ @DontInline
+ private void writeExcessSlow() throws IOException
+ {
+ int originalLimit = hollowBuffer.limit();
+ while (originalLimit - hollowBuffer.position() > buffer.remaining())
+ {
+ hollowBuffer.limit(hollowBuffer.position() + buffer.remaining());
+ buffer.put(hollowBuffer);
+ doFlush();
+ }
+ hollowBuffer.limit(originalLimit);
+ }
@Override
public void write(int b) throws IOException
{
- ensureRemaining(1);
+ if (!buffer.hasRemaining())
+ doFlush();
buffer.put((byte) (b & 0xFF));
}
@Override
public void writeBoolean(boolean v) throws IOException
{
- ensureRemaining(1);
+ if (!buffer.hasRemaining())
+ doFlush();
buffer.put(v ? (byte)1 : (byte)0);
}
@@ -188,29 +212,34 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
@Override
public void writeShort(int v) throws IOException
{
- ensureRemaining(2);
- buffer.putShort((short) v);
+ writeChar(v);
}
@Override
public void writeChar(int v) throws IOException
{
- ensureRemaining(2);
- buffer.putChar((char) v);
+ if (buffer.remaining() < 2)
+ writeSlow(v, 2);
+ else
+ buffer.putChar((char) v);
}
@Override
public void writeInt(int v) throws IOException
{
- ensureRemaining(4);
- buffer.putInt(v);
+ if (buffer.remaining() < 4)
+ writeSlow(v, 4);
+ else
+ buffer.putInt(v);
}
@Override
public void writeLong(long v) throws IOException
{
- ensureRemaining(8);
- buffer.putLong(v);
+ if (buffer.remaining() < 8)
+ writeSlow(v, 8);
+ else
+ buffer.putLong(v);
}
@Override
@@ -225,27 +254,33 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
int size = VIntCoding.computeUnsignedVIntSize(value);
if (size == 1)
{
- ensureRemaining(1);
- buffer.put((byte) value);
+ write((int) value);
return;
}
- ensureRemaining(size);
- buffer.put(VIntCoding.encodeVInt(value, size), 0, size);
+ write(VIntCoding.encodeVInt(value, size), 0, size);
}
@Override
public void writeFloat(float v) throws IOException
{
- ensureRemaining(4);
- buffer.putFloat(v);
+ writeInt(Float.floatToRawIntBits(v));
}
@Override
public void writeDouble(double v) throws IOException
{
- ensureRemaining(8);
- buffer.putDouble(v);
+ writeLong(Double.doubleToRawLongBits(v));
+ }
+
+ @DontInline
+ private void writeSlow(long bytes, int count) throws IOException
+ {
+ int origCount = count;
+ if (ByteOrder.BIG_ENDIAN == buffer.order())
+ while (count > 0) writeByte((int) (bytes >>> (8 * --count)));
+ else
+ while (count > 0) writeByte((int) (bytes >>> (8 * (origCount - count--))));
}
@Override
@@ -275,6 +310,7 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
write(buffer);
}
+ @DontInline
protected void doFlush() throws IOException
{
buffer.flip();
@@ -300,15 +336,11 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
buffer = null;
}
- protected void ensureRemaining(int minimum) throws IOException
- {
- if (buffer.remaining() < minimum)
- doFlush();
- }
-
@Override
public <R> R applyToChannel(Function<WritableByteChannel, R> f) throws IOException
{
+ if (strictFlushing)
+ throw new UnsupportedOperationException();
//Don't allow writes to the underlying channel while data is buffered
flush();
return f.apply(channel);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
index d5e6be9..8203a37 100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
@@ -31,10 +31,11 @@ public class ChecksummedSequentialWriter extends SequentialWriter
{
super(file, bufferSize, BufferType.ON_HEAP);
crcWriter = new SequentialWriter(crcPath, 8 * 1024, BufferType.ON_HEAP);
- crcMetadata = new DataIntegrityMetadata.ChecksumWriter(crcWriter.stream);
+ crcMetadata = new DataIntegrityMetadata.ChecksumWriter(crcWriter);
crcMetadata.writeChunkSize(buffer.capacity());
}
+ @Override
protected void flushData()
{
super.flushData();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index ddabe89..905a5c6 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -18,10 +18,7 @@
package org.apache.cassandra.io.util;
import java.io.*;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
-import java.nio.channels.WritableByteChannel;
import java.nio.file.StandardOpenOption;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -32,35 +29,27 @@ import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.utils.CLibrary;
import org.apache.cassandra.utils.concurrent.Transactional;
import static org.apache.cassandra.utils.Throwables.merge;
+
import org.apache.cassandra.utils.SyncUtil;
/**
* Adds buffering, mark, and fsyncing to OutputStream. We always fsync on close; we may also
* fsync incrementally if Config.trickle_fsync is enabled.
*/
-public class SequentialWriter extends OutputStream implements WritableByteChannel, Transactional
+public class SequentialWriter extends BufferedDataOutputStreamPlus implements Transactional
{
private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
- // isDirty - true if this.buffer contains any un-synced bytes
- protected boolean isDirty = false, syncNeeded = false;
-
// absolute path to the given file
private final String filePath;
- protected ByteBuffer buffer;
- private int directoryFD;
- // directory should be synced only after first file sync, in other words, only once per file
- private boolean directorySynced = false;
-
// Offset for start of buffer relative to underlying file
protected long bufferOffset;
- protected final FileChannel channel;
+ protected final FileChannel fchannel;
// whether to do trickling fsync() to avoid sudden bursts of dirty buffer flushing by kernel causing read
// latency spikes
@@ -68,7 +57,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
private int trickleFsyncByteInterval;
private int bytesSinceTrickleFsync = 0;
- public final DataOutputPlus stream;
protected long lastFlushOffset;
protected Runnable runPostFlush;
@@ -84,13 +72,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
@Override
protected Throwable doPreCleanup(Throwable accumulate)
{
- if (directoryFD >= 0)
- {
- try { CLibrary.tryCloseFD(directoryFD); }
- catch (Throwable t) { accumulate = merge(accumulate, t); }
- directoryFD = -1;
- }
-
// close is idempotent
try { channel.close(); }
catch (Throwable t) { accumulate = merge(accumulate, t); }
@@ -127,30 +108,45 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
}
}
- public SequentialWriter(File file, int bufferSize, BufferType bufferType)
- {
+ // TODO: we should specify as a parameter if we permit an existing file or not
+ private static FileChannel openChannel(File file) {
try
{
if (file.exists())
- channel = FileChannel.open(file.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE);
+ {
+ return FileChannel.open(file.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE);
+ }
else
- channel = FileChannel.open(file.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
+ {
+ FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
+ try
+ {
+ SyncUtil.trySyncDir(file.getParentFile());
+ }
+ catch (Throwable t)
+ {
+ try { channel.close(); }
+ catch (Throwable t2) { t.addSuppressed(t2); }
+ }
+ return channel;
+ }
}
catch (IOException e)
{
throw new RuntimeException(e);
}
+ }
- filePath = file.getAbsolutePath();
+ public SequentialWriter(File file, int bufferSize, BufferType bufferType)
+ {
+ super(openChannel(file), bufferType.allocate(bufferSize));
+ strictFlushing = true;
+ fchannel = (FileChannel)channel;
- // Allow children to allocate buffer as direct (snappy compression) if necessary
- buffer = bufferType.allocate(bufferSize);
+ filePath = file.getAbsolutePath();
this.trickleFsync = DatabaseDescriptor.getTrickleFsync();
this.trickleFsyncByteInterval = DatabaseDescriptor.getTrickleFsyncIntervalInKb() * 1024;
-
- directoryFD = CLibrary.tryOpenDirectory(file.getParent());
- stream = new WrappedDataOutputStreamPlus(this, this);
}
/**
@@ -174,73 +170,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
return new CompressedSequentialWriter(new File(dataFilePath), offsetsPath, parameters, sstableMetadataCollector);
}
- public void write(int value) throws ClosedChannelException
- {
- if (buffer == null)
- throw new ClosedChannelException();
-
- if (!buffer.hasRemaining())
- {
- reBuffer();
- }
-
- buffer.put((byte) value);
-
- isDirty = true;
- syncNeeded = true;
- }
-
- public void write(byte[] buffer) throws IOException
- {
- write(buffer, 0, buffer.length);
- }
-
- public void write(byte[] data, int offset, int length) throws IOException
- {
- if (buffer == null)
- throw new ClosedChannelException();
-
- int position = offset;
- int remaining = length;
- while (remaining > 0)
- {
- if (!buffer.hasRemaining())
- reBuffer();
-
- int toCopy = Math.min(remaining, buffer.remaining());
- buffer.put(data, position, toCopy);
-
- remaining -= toCopy;
- position += toCopy;
-
- isDirty = true;
- syncNeeded = true;
- }
- }
-
- public int write(ByteBuffer src) throws IOException
- {
- if (buffer == null)
- throw new ClosedChannelException();
-
- int length = src.remaining();
- int finalLimit = src.limit();
- while (src.hasRemaining())
- {
- if (!buffer.hasRemaining())
- reBuffer();
-
- if (buffer.remaining() < src.remaining())
- src.limit(src.position() + buffer.remaining());
- buffer.put(src);
- src.limit(finalLimit);
-
- isDirty = true;
- syncNeeded = true;
- }
- return length;
- }
-
/**
* Synchronize file contents with disk.
*/
@@ -253,7 +182,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
{
try
{
- SyncUtil.force(channel, false);
+ SyncUtil.force(fchannel, false);
}
catch (IOException e)
{
@@ -261,55 +190,34 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
}
}
+ /*
+ * This is only safe to call before truncation or close for CompressedSequentialWriter
+ * Otherwise it will leave a non-uniform size compressed block in the middle of the file
+ * and the compressed format can't handle that.
+ */
protected void syncInternal()
{
- if (syncNeeded)
- {
- flushInternal();
- syncDataOnlyInternal();
-
- if (!directorySynced)
- {
- SyncUtil.trySync(directoryFD);
- directorySynced = true;
- }
-
- syncNeeded = false;
- }
+ doFlush();
+ syncDataOnlyInternal();
}
- /**
- * If buffer is dirty, flush it's contents to the operating system. Does not imply fsync().
- *
- * Currently, for implementation reasons, this also invalidates the buffer.
- */
@Override
- public void flush()
+ protected void doFlush()
{
- flushInternal();
- }
+ flushData();
- protected void flushInternal()
- {
- if (isDirty)
+ if (trickleFsync)
{
- flushData();
-
- if (trickleFsync)
+ bytesSinceTrickleFsync += buffer.position();
+ if (bytesSinceTrickleFsync >= trickleFsyncByteInterval)
{
- bytesSinceTrickleFsync += buffer.position();
- if (bytesSinceTrickleFsync >= trickleFsyncByteInterval)
- {
- syncDataOnlyInternal();
- bytesSinceTrickleFsync = 0;
- }
+ syncDataOnlyInternal();
+ bytesSinceTrickleFsync = 0;
}
-
- // Remember that we wrote, so we don't write it again on next flush().
- resetBuffer();
-
- isDirty = false;
}
+
+ // Remember that we wrote, so we don't write it again on next flush().
+ resetBuffer();
}
public void setPostFlushListener(Runnable runPostFlush)
@@ -361,7 +269,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
{
try
{
- return Math.max(current(), channel.size());
+ return Math.max(current(), fchannel.size());
}
catch (IOException e)
{
@@ -374,12 +282,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
return filePath;
}
- protected void reBuffer()
- {
- flushInternal();
- resetBuffer();
- }
-
protected void resetBuffer()
{
bufferOffset = current();
@@ -423,7 +325,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
try
{
- channel.position(truncateTarget);
+ fchannel.position(truncateTarget);
}
catch (IOException e)
{
@@ -442,7 +344,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
{
try
{
- channel.truncate(toSize);
+ fchannel.truncate(toSize);
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/src/java/org/apache/cassandra/utils/SyncUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/SyncUtil.java b/src/java/org/apache/cassandra/utils/SyncUtil.java
index 0e83ba2..b217e29 100644
--- a/src/java/org/apache/cassandra/utils/SyncUtil.java
+++ b/src/java/org/apache/cassandra/utils/SyncUtil.java
@@ -1,10 +1,6 @@
package org.apache.cassandra.utils;
-import java.io.FileDescriptor;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.io.SyncFailedException;
+import java.io.*;
import java.lang.reflect.Field;
import java.nio.MappedByteBuffer;
import java.nio.channels.ClosedChannelException;
@@ -162,4 +158,20 @@ public class SyncUtil
else
CLibrary.trySync(fd);
}
+
+ public static void trySyncDir(File dir)
+ {
+ if (SKIP_SYNC)
+ return;
+
+ int directoryFD = CLibrary.tryOpenDirectory(dir.getPath());
+ try
+ {
+ trySync(directoryFD);
+ }
+ finally
+ {
+ CLibrary.tryCloseFD(directoryFD);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
index 6a57327..1b94a6b 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
@@ -32,10 +32,7 @@ import org.apache.cassandra.db.compaction.writers.MajorLeveledCompactionWriter;
import org.apache.cassandra.db.compaction.writers.MaxSSTableSizeWriter;
import org.apache.cassandra.db.compaction.writers.SplittingSizeTieredCompactionWriter;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
@@ -183,14 +180,14 @@ public class CompactionAwareWriterTest extends CQLTester
rowsWritten++;
}
}
- Collection<SSTableReader> newSSTables = writer.finish();
+ writer.finish();
return rowsWritten;
}
private void populate(int count) throws Throwable
{
- byte [] payload = new byte[1000];
- new Random().nextBytes(payload);
+ byte [] payload = new byte[5000];
+ new Random(42).nextBytes(payload);
ByteBuffer b = ByteBuffer.wrap(payload);
for (int i = 0; i < count; i++)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
index 28af0ae..ca15722 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@ -94,7 +94,7 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest
byte[] rawPost = new byte[bytesToTest];
try (CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", compressionParameters, sstableMetadataCollector);)
{
- Random r = new Random();
+ Random r = new Random(42);
// Test both write with byte[] and ByteBuffer
r.nextBytes(dataPre);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
index acef1ec..469bccb 100644
--- a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
+++ b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
@@ -7,6 +7,7 @@ import java.io.UTFDataFormatException;
import java.lang.reflect.Field;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.nio.channels.WritableByteChannel;
import java.util.Arrays;
import java.util.Random;
@@ -161,12 +162,14 @@ public class BufferedDataOutputStreamTest
private ByteArrayOutputStream canonical;
private DataOutputStreamPlus dosp;
- void setUp()
+ void setUp() throws Exception
{
generated = new ByteArrayOutputStream();
canonical = new ByteArrayOutputStream();
dosp = new WrappedDataOutputStreamPlus(canonical);
+ if (ndosp != null)
+ ndosp.close();
ndosp = new BufferedDataOutputStreamPlus(adapter, 4096);
}
@@ -535,4 +538,61 @@ public class BufferedDataOutputStreamTest
for (long v : testValues)
assertEquals(v, bbdi.readUnsignedVInt());
}
+
+ @Test
+ public void testWriteSlowByteOrder() throws Exception
+ {
+ try (DataOutputBuffer dob = new DataOutputBuffer(4))
+ {
+ dob.order(ByteOrder.LITTLE_ENDIAN);
+ dob.writeLong(42);
+ assertEquals(42, ByteBuffer.wrap(dob.toByteArray()).order(ByteOrder.LITTLE_ENDIAN).getLong());
+ }
+ }
+
+ @Test
+ public void testWriteExcessSlow() throws Exception
+ {
+ try (DataOutputBuffer dob = new DataOutputBuffer(4))
+ {
+ dob.strictFlushing = true;
+ ByteBuffer buf = ByteBuffer.allocateDirect(8);
+ buf.putLong(0, 42);
+ dob.write(buf);
+ assertEquals(42, ByteBuffer.wrap(dob.toByteArray()).getLong());
+ }
+ }
+
+ @Test
+ public void testApplyToChannel() throws Exception
+ {
+ setUp();
+ Object obj = new Object();
+ Object retval = ndosp.applyToChannel( channel -> {
+ ByteBuffer buf = ByteBuffer.allocate(8);
+ buf.putLong(0, 42);
+ try
+ {
+ channel.write(buf);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ return obj;
+ });
+ assertEquals(obj, retval);
+ assertEquals(42, ByteBuffer.wrap(generated.toByteArray()).getLong());
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testApplyToChannelThrowsForMisaligned() throws Exception
+ {
+ setUp();
+ ndosp.strictFlushing = true;
+ ndosp.applyToChannel( channel -> {
+ return null;
+ });
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
index 3c9aa0e..e051c00 100644
--- a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
+++ b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
@@ -467,6 +467,10 @@ public class BufferedRandomAccessFileTest
}
}, AssertionError.class);
+ //Used to throw ClosedChannelException, but now that it extends BDOSP it just NPEs on the buffer
+ //Writing to a BufferedOutputStream that is closed generates no error
+ //Going to allow the NPE to throw to catch as a bug any use after close. Notably it won't throw NPE for a
+ //write of a 0 length, but that is kind of a corner case
expectException(new Callable<Object>()
{
public Object call() throws IOException
@@ -474,7 +478,7 @@ public class BufferedRandomAccessFileTest
w.write(generateByteArray(1));
return null;
}
- }, ClosedChannelException.class);
+ }, NullPointerException.class);
try (RandomAccessReader copy = RandomAccessReader.open(new File(r.getPath())))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
index e3014c3..fea6d2b 100644
--- a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
@@ -71,7 +71,7 @@ public class CompressedInputStreamTest
for (long l = 0L; l < 1000; l++)
{
index.put(l, writer.getFilePointer());
- writer.stream.writeLong(l);
+ writer.writeLong(l);
}
writer.finish();
[3/3] cassandra git commit: Merge branch 'cassandra-3.0' into trunk
Posted by be...@apache.org.
Merge branch 'cassandra-3.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ee4cb86e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ee4cb86e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ee4cb86e
Branch: refs/heads/trunk
Commit: ee4cb86ec4ca97f51a614dfeb434aa365695ef5c
Parents: 6ae27a3 29687a8
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Mon Aug 17 09:26:53 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Mon Aug 17 09:26:53 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ColumnIndex.java | 10 +-
.../io/compress/CompressedSequentialWriter.java | 33 +++-
.../io/sstable/format/big/BigTableWriter.java | 6 +-
.../io/util/BufferedDataOutputStreamPlus.java | 104 ++++++----
.../io/util/ChecksummedSequentialWriter.java | 3 +-
.../cassandra/io/util/SequentialWriter.java | 196 +++++--------------
.../org/apache/cassandra/utils/SyncUtil.java | 22 ++-
.../compaction/CompactionAwareWriterTest.java | 9 +-
.../CompressedSequentialWriterTest.java | 2 +-
.../io/util/BufferedDataOutputStreamTest.java | 62 +++++-
.../io/util/BufferedRandomAccessFileTest.java | 6 +-
.../compression/CompressedInputStreamTest.java | 2 +-
13 files changed, 240 insertions(+), 216 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee4cb86e/CHANGES.txt
----------------------------------------------------------------------
[2/3] cassandra git commit: SequentialWriter extends BDOSP
Posted by be...@apache.org.
SequentialWriter extends BDOSP
patch by ariel; reviewed by benedict for CASSANDRA-9500
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/29687a8b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/29687a8b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/29687a8b
Branch: refs/heads/trunk
Commit: 29687a8bb93a1df637e0ef32d3784e338afeecd9
Parents: 136e700
Author: Ariel Weisberg <ar...@weisberg.ws>
Authored: Wed Jul 22 18:34:55 2015 -0400
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Mon Aug 17 09:25:58 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ColumnIndex.java | 10 +-
.../io/compress/CompressedSequentialWriter.java | 33 +++-
.../io/sstable/format/big/BigTableWriter.java | 6 +-
.../io/util/BufferedDataOutputStreamPlus.java | 104 ++++++----
.../io/util/ChecksummedSequentialWriter.java | 3 +-
.../cassandra/io/util/SequentialWriter.java | 196 +++++--------------
.../org/apache/cassandra/utils/SyncUtil.java | 22 ++-
.../compaction/CompactionAwareWriterTest.java | 9 +-
.../CompressedSequentialWriterTest.java | 2 +-
.../io/util/BufferedDataOutputStreamTest.java | 62 +++++-
.../io/util/BufferedRandomAccessFileTest.java | 6 +-
.../compression/CompressedInputStreamTest.java | 2 +-
13 files changed, 240 insertions(+), 216 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3686a3d..b4b568c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -14,6 +14,7 @@
* Use byte to serialize MT hash length (CASSANDRA-9792)
* Replace usage of Adler32 with CRC32 (CASSANDRA-8684)
* Fix migration to new format from 2.1 SSTable (CASSANDRA-10006)
+ * SequentialWriter should extend BufferedDataOutputStreamPlus (CASSANDRA-9500)
Merged from 2.2:
* Fix histogram overflow exception (CASSANDRA-9973)
* Route gossip messages over dedicated socket (CASSANDRA-9237)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 52fc48f..9eef23e 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -94,10 +94,10 @@ public class ColumnIndex
private void writePartitionHeader(UnfilteredRowIterator iterator) throws IOException
{
- ByteBufferUtil.writeWithShortLength(iterator.partitionKey().getKey(), writer.stream);
- DeletionTime.serializer.serialize(iterator.partitionLevelDeletion(), writer.stream);
+ ByteBufferUtil.writeWithShortLength(iterator.partitionKey().getKey(), writer);
+ DeletionTime.serializer.serialize(iterator.partitionLevelDeletion(), writer);
if (header.hasStatic())
- UnfilteredSerializer.serializer.serialize(iterator.staticRow(), header, writer.stream, version);
+ UnfilteredSerializer.serializer.serialize(iterator.staticRow(), header, writer, version);
}
public ColumnIndex build() throws IOException
@@ -135,7 +135,7 @@ public class ColumnIndex
startPosition = currentPosition();
}
- UnfilteredSerializer.serializer.serialize(unfiltered, header, writer.stream, version);
+ UnfilteredSerializer.serializer.serialize(unfiltered, header, writer, version);
lastClustering = unfiltered.clustering();
++written;
@@ -153,7 +153,7 @@ public class ColumnIndex
private ColumnIndex close() throws IOException
{
- UnfilteredSerializer.serializer.writeEndOfPartition(writer.stream);
+ UnfilteredSerializer.serializer.writeEndOfPartition(writer);
// It's possible we add no rows, just a top level deletion
if (written == 0)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index a4afa3f..8e1ebff 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -17,6 +17,8 @@
*/
package org.apache.cassandra.io.compress;
+import static org.apache.cassandra.utils.Throwables.merge;
+
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
@@ -31,6 +33,7 @@ import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.DataIntegrityMetadata;
import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.schema.CompressionParams;
@@ -81,7 +84,7 @@ public class CompressedSequentialWriter extends SequentialWriter
{
try
{
- return channel.position();
+ return fchannel.position();
}
catch (IOException e)
{
@@ -130,9 +133,6 @@ public class CompressedSequentialWriter extends SequentialWriter
compressed.rewind();
crcMetadata.appendDirect(compressed, true);
lastFlushOffset += compressedLength + 4;
-
- // adjust our bufferOffset to account for the new uncompressed data we've now written out
- resetBuffer();
}
catch (IOException e)
{
@@ -155,6 +155,8 @@ public class CompressedSequentialWriter extends SequentialWriter
@Override
public FileMark mark()
{
+ if (!buffer.hasRemaining())
+ doFlush();
return new CompressedFileWriterMark(chunkOffset, current(), buffer.position(), chunkCount + 1);
}
@@ -189,8 +191,8 @@ public class CompressedSequentialWriter extends SequentialWriter
{
compressed.clear();
compressed.limit(chunkSize);
- channel.position(chunkOffset);
- channel.read(compressed);
+ fchannel.position(chunkOffset);
+ fchannel.read(compressed);
try
{
@@ -209,7 +211,7 @@ public class CompressedSequentialWriter extends SequentialWriter
checksum.update(compressed);
crcCheckBuffer.clear();
- channel.read(crcCheckBuffer);
+ fchannel.read(crcCheckBuffer);
crcCheckBuffer.flip();
if (crcCheckBuffer.getInt() != (int) checksum.getValue())
throw new CorruptBlockException(getPath(), chunkOffset, chunkSize);
@@ -229,7 +231,6 @@ public class CompressedSequentialWriter extends SequentialWriter
// Mark as dirty so we can guarantee the newly buffered bytes won't be lost on a rebuffer
buffer.position(realMark.validBufferBytes);
- isDirty = true;
bufferOffset = truncateTarget - buffer.position();
chunkCount = realMark.nextChunkIndex - 1;
@@ -248,7 +249,7 @@ public class CompressedSequentialWriter extends SequentialWriter
{
try
{
- channel.position(chunkOffset);
+ fchannel.position(chunkOffset);
}
catch (IOException e)
{
@@ -281,6 +282,20 @@ public class CompressedSequentialWriter extends SequentialWriter
sstableMetadataCollector.addCompressionRatio(compressedSize, uncompressedSize);
metadataWriter.finalizeLength(current(), chunkCount).prepareToCommit();
}
+
+ @Override
+ protected Throwable doPreCleanup(Throwable accumulate)
+ {
+ accumulate = super.doPreCleanup(accumulate);
+ if (compressed != null)
+ {
+ try { FileUtils.clean(compressed); }
+ catch (Throwable t) { accumulate = merge(accumulate, t); }
+ compressed = null;
+ }
+
+ return accumulate;
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 2b60479..77bf3d6 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -342,7 +342,7 @@ public class BigTableWriter extends SSTableWriter
File file = new File(desc.filenameFor(Component.STATS));
try (SequentialWriter out = SequentialWriter.open(file))
{
- desc.getMetadataSerializer().serialize(components, out.stream);
+ desc.getMetadataSerializer().serialize(components, out);
out.setDescriptor(desc).finish();
}
catch (IOException e)
@@ -407,8 +407,8 @@ public class BigTableWriter extends SSTableWriter
long indexStart = indexFile.getFilePointer();
try
{
- ByteBufferUtil.writeWithShortLength(key.getKey(), indexFile.stream);
- rowIndexEntrySerializer.serialize(indexEntry, indexFile.stream);
+ ByteBufferUtil.writeWithShortLength(key.getKey(), indexFile);
+ rowIndexEntrySerializer.serialize(indexEntry, indexFile);
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
index b6f3231..9434219 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
@@ -27,6 +27,8 @@ import java.nio.channels.WritableByteChannel;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
+import net.nicoulaj.compilecommand.annotations.DontInline;
+
import org.apache.cassandra.config.Config;
import org.apache.cassandra.utils.memory.MemoryUtil;
import org.apache.cassandra.utils.vint.VIntCoding;
@@ -41,7 +43,16 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
{
private static final int DEFAULT_BUFFER_SIZE = Integer.getInteger(Config.PROPERTY_PREFIX + "nio_data_output_stream_plus_buffer_size", 1024 * 32);
- ByteBuffer buffer;
+ protected ByteBuffer buffer;
+
+ //Allow derived classes to specify writing to the channel
+ //directly shouldn't happen because they intercept via doFlush for things
+ //like compression or checksumming
+ //Another hack for this value is that it also indicates that flushing early
+ //should not occur, flushes aligned with buffer size are desired
+ //Unless... it's the last flush. Compression and checksum formats
+ //expect block (same as buffer size) alignment for everything except the last block
+ protected boolean strictFlushing = false;
public BufferedDataOutputStreamPlus(RandomAccessFile ras)
{
@@ -142,40 +153,53 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
else
{
assert toWrite.isDirect();
+ MemoryUtil.duplicateDirectByteBuffer(toWrite, hollowBuffer);
+
if (toWrite.remaining() > buffer.remaining())
{
- doFlush();
- MemoryUtil.duplicateDirectByteBuffer(toWrite, hollowBuffer);
- if (toWrite.remaining() > buffer.remaining())
+ if (strictFlushing)
{
- while (hollowBuffer.hasRemaining())
- channel.write(hollowBuffer);
+ writeExcessSlow();
}
else
{
- buffer.put(hollowBuffer);
+ doFlush();
+ while (hollowBuffer.remaining() > buffer.capacity())
+ channel.write(hollowBuffer);
}
}
- else
- {
- MemoryUtil.duplicateDirectByteBuffer(toWrite, hollowBuffer);
- buffer.put(hollowBuffer);
- }
+
+ buffer.put(hollowBuffer);
}
}
+ // writes anything we can't fit into the buffer
+ @DontInline
+ private void writeExcessSlow() throws IOException
+ {
+ int originalLimit = hollowBuffer.limit();
+ while (originalLimit - hollowBuffer.position() > buffer.remaining())
+ {
+ hollowBuffer.limit(hollowBuffer.position() + buffer.remaining());
+ buffer.put(hollowBuffer);
+ doFlush();
+ }
+ hollowBuffer.limit(originalLimit);
+ }
@Override
public void write(int b) throws IOException
{
- ensureRemaining(1);
+ if (!buffer.hasRemaining())
+ doFlush();
buffer.put((byte) (b & 0xFF));
}
@Override
public void writeBoolean(boolean v) throws IOException
{
- ensureRemaining(1);
+ if (!buffer.hasRemaining())
+ doFlush();
buffer.put(v ? (byte)1 : (byte)0);
}
@@ -188,29 +212,34 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
@Override
public void writeShort(int v) throws IOException
{
- ensureRemaining(2);
- buffer.putShort((short) v);
+ writeChar(v);
}
@Override
public void writeChar(int v) throws IOException
{
- ensureRemaining(2);
- buffer.putChar((char) v);
+ if (buffer.remaining() < 2)
+ writeSlow(v, 2);
+ else
+ buffer.putChar((char) v);
}
@Override
public void writeInt(int v) throws IOException
{
- ensureRemaining(4);
- buffer.putInt(v);
+ if (buffer.remaining() < 4)
+ writeSlow(v, 4);
+ else
+ buffer.putInt(v);
}
@Override
public void writeLong(long v) throws IOException
{
- ensureRemaining(8);
- buffer.putLong(v);
+ if (buffer.remaining() < 8)
+ writeSlow(v, 8);
+ else
+ buffer.putLong(v);
}
@Override
@@ -225,27 +254,33 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
int size = VIntCoding.computeUnsignedVIntSize(value);
if (size == 1)
{
- ensureRemaining(1);
- buffer.put((byte) value);
+ write((int) value);
return;
}
- ensureRemaining(size);
- buffer.put(VIntCoding.encodeVInt(value, size), 0, size);
+ write(VIntCoding.encodeVInt(value, size), 0, size);
}
@Override
public void writeFloat(float v) throws IOException
{
- ensureRemaining(4);
- buffer.putFloat(v);
+ writeInt(Float.floatToRawIntBits(v));
}
@Override
public void writeDouble(double v) throws IOException
{
- ensureRemaining(8);
- buffer.putDouble(v);
+ writeLong(Double.doubleToRawLongBits(v));
+ }
+
+ @DontInline
+ private void writeSlow(long bytes, int count) throws IOException
+ {
+ int origCount = count;
+ if (ByteOrder.BIG_ENDIAN == buffer.order())
+ while (count > 0) writeByte((int) (bytes >>> (8 * --count)));
+ else
+ while (count > 0) writeByte((int) (bytes >>> (8 * (origCount - count--))));
}
@Override
@@ -275,6 +310,7 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
write(buffer);
}
+ @DontInline
protected void doFlush() throws IOException
{
buffer.flip();
@@ -300,15 +336,11 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
buffer = null;
}
- protected void ensureRemaining(int minimum) throws IOException
- {
- if (buffer.remaining() < minimum)
- doFlush();
- }
-
@Override
public <R> R applyToChannel(Function<WritableByteChannel, R> f) throws IOException
{
+ if (strictFlushing)
+ throw new UnsupportedOperationException();
//Don't allow writes to the underlying channel while data is buffered
flush();
return f.apply(channel);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
index d5e6be9..8203a37 100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
@@ -31,10 +31,11 @@ public class ChecksummedSequentialWriter extends SequentialWriter
{
super(file, bufferSize, BufferType.ON_HEAP);
crcWriter = new SequentialWriter(crcPath, 8 * 1024, BufferType.ON_HEAP);
- crcMetadata = new DataIntegrityMetadata.ChecksumWriter(crcWriter.stream);
+ crcMetadata = new DataIntegrityMetadata.ChecksumWriter(crcWriter);
crcMetadata.writeChunkSize(buffer.capacity());
}
+ @Override
protected void flushData()
{
super.flushData();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index ddabe89..905a5c6 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -18,10 +18,7 @@
package org.apache.cassandra.io.util;
import java.io.*;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
-import java.nio.channels.WritableByteChannel;
import java.nio.file.StandardOpenOption;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -32,35 +29,27 @@ import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.utils.CLibrary;
import org.apache.cassandra.utils.concurrent.Transactional;
import static org.apache.cassandra.utils.Throwables.merge;
+
import org.apache.cassandra.utils.SyncUtil;
/**
* Adds buffering, mark, and fsyncing to OutputStream. We always fsync on close; we may also
* fsync incrementally if Config.trickle_fsync is enabled.
*/
-public class SequentialWriter extends OutputStream implements WritableByteChannel, Transactional
+public class SequentialWriter extends BufferedDataOutputStreamPlus implements Transactional
{
private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
- // isDirty - true if this.buffer contains any un-synced bytes
- protected boolean isDirty = false, syncNeeded = false;
-
// absolute path to the given file
private final String filePath;
- protected ByteBuffer buffer;
- private int directoryFD;
- // directory should be synced only after first file sync, in other words, only once per file
- private boolean directorySynced = false;
-
// Offset for start of buffer relative to underlying file
protected long bufferOffset;
- protected final FileChannel channel;
+ protected final FileChannel fchannel;
// whether to do trickling fsync() to avoid sudden bursts of dirty buffer flushing by kernel causing read
// latency spikes
@@ -68,7 +57,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
private int trickleFsyncByteInterval;
private int bytesSinceTrickleFsync = 0;
- public final DataOutputPlus stream;
protected long lastFlushOffset;
protected Runnable runPostFlush;
@@ -84,13 +72,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
@Override
protected Throwable doPreCleanup(Throwable accumulate)
{
- if (directoryFD >= 0)
- {
- try { CLibrary.tryCloseFD(directoryFD); }
- catch (Throwable t) { accumulate = merge(accumulate, t); }
- directoryFD = -1;
- }
-
// close is idempotent
try { channel.close(); }
catch (Throwable t) { accumulate = merge(accumulate, t); }
@@ -127,30 +108,45 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
}
}
- public SequentialWriter(File file, int bufferSize, BufferType bufferType)
- {
+ // TODO: we should specify as a parameter if we permit an existing file or not
+ private static FileChannel openChannel(File file) {
try
{
if (file.exists())
- channel = FileChannel.open(file.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE);
+ {
+ return FileChannel.open(file.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE);
+ }
else
- channel = FileChannel.open(file.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
+ {
+ FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
+ try
+ {
+ SyncUtil.trySyncDir(file.getParentFile());
+ }
+ catch (Throwable t)
+ {
+ try { channel.close(); }
+ catch (Throwable t2) { t.addSuppressed(t2); }
+ }
+ return channel;
+ }
}
catch (IOException e)
{
throw new RuntimeException(e);
}
+ }
- filePath = file.getAbsolutePath();
+ public SequentialWriter(File file, int bufferSize, BufferType bufferType)
+ {
+ super(openChannel(file), bufferType.allocate(bufferSize));
+ strictFlushing = true;
+ fchannel = (FileChannel)channel;
- // Allow children to allocate buffer as direct (snappy compression) if necessary
- buffer = bufferType.allocate(bufferSize);
+ filePath = file.getAbsolutePath();
this.trickleFsync = DatabaseDescriptor.getTrickleFsync();
this.trickleFsyncByteInterval = DatabaseDescriptor.getTrickleFsyncIntervalInKb() * 1024;
-
- directoryFD = CLibrary.tryOpenDirectory(file.getParent());
- stream = new WrappedDataOutputStreamPlus(this, this);
}
/**
@@ -174,73 +170,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
return new CompressedSequentialWriter(new File(dataFilePath), offsetsPath, parameters, sstableMetadataCollector);
}
- public void write(int value) throws ClosedChannelException
- {
- if (buffer == null)
- throw new ClosedChannelException();
-
- if (!buffer.hasRemaining())
- {
- reBuffer();
- }
-
- buffer.put((byte) value);
-
- isDirty = true;
- syncNeeded = true;
- }
-
- public void write(byte[] buffer) throws IOException
- {
- write(buffer, 0, buffer.length);
- }
-
- public void write(byte[] data, int offset, int length) throws IOException
- {
- if (buffer == null)
- throw new ClosedChannelException();
-
- int position = offset;
- int remaining = length;
- while (remaining > 0)
- {
- if (!buffer.hasRemaining())
- reBuffer();
-
- int toCopy = Math.min(remaining, buffer.remaining());
- buffer.put(data, position, toCopy);
-
- remaining -= toCopy;
- position += toCopy;
-
- isDirty = true;
- syncNeeded = true;
- }
- }
-
- public int write(ByteBuffer src) throws IOException
- {
- if (buffer == null)
- throw new ClosedChannelException();
-
- int length = src.remaining();
- int finalLimit = src.limit();
- while (src.hasRemaining())
- {
- if (!buffer.hasRemaining())
- reBuffer();
-
- if (buffer.remaining() < src.remaining())
- src.limit(src.position() + buffer.remaining());
- buffer.put(src);
- src.limit(finalLimit);
-
- isDirty = true;
- syncNeeded = true;
- }
- return length;
- }
-
/**
* Synchronize file contents with disk.
*/
@@ -253,7 +182,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
{
try
{
- SyncUtil.force(channel, false);
+ SyncUtil.force(fchannel, false);
}
catch (IOException e)
{
@@ -261,55 +190,34 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
}
}
+ /*
+ * This is only safe to call before truncation or close for CompressedSequentialWriter
+ * Otherwise it will leave a non-uniform size compressed block in the middle of the file
+ * and the compressed format can't handle that.
+ */
protected void syncInternal()
{
- if (syncNeeded)
- {
- flushInternal();
- syncDataOnlyInternal();
-
- if (!directorySynced)
- {
- SyncUtil.trySync(directoryFD);
- directorySynced = true;
- }
-
- syncNeeded = false;
- }
+ doFlush();
+ syncDataOnlyInternal();
}
- /**
- * If buffer is dirty, flush it's contents to the operating system. Does not imply fsync().
- *
- * Currently, for implementation reasons, this also invalidates the buffer.
- */
@Override
- public void flush()
+ protected void doFlush()
{
- flushInternal();
- }
+ flushData();
- protected void flushInternal()
- {
- if (isDirty)
+ if (trickleFsync)
{
- flushData();
-
- if (trickleFsync)
+ bytesSinceTrickleFsync += buffer.position();
+ if (bytesSinceTrickleFsync >= trickleFsyncByteInterval)
{
- bytesSinceTrickleFsync += buffer.position();
- if (bytesSinceTrickleFsync >= trickleFsyncByteInterval)
- {
- syncDataOnlyInternal();
- bytesSinceTrickleFsync = 0;
- }
+ syncDataOnlyInternal();
+ bytesSinceTrickleFsync = 0;
}
-
- // Remember that we wrote, so we don't write it again on next flush().
- resetBuffer();
-
- isDirty = false;
}
+
+ // Remember that we wrote, so we don't write it again on next flush().
+ resetBuffer();
}
public void setPostFlushListener(Runnable runPostFlush)
@@ -361,7 +269,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
{
try
{
- return Math.max(current(), channel.size());
+ return Math.max(current(), fchannel.size());
}
catch (IOException e)
{
@@ -374,12 +282,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
return filePath;
}
- protected void reBuffer()
- {
- flushInternal();
- resetBuffer();
- }
-
protected void resetBuffer()
{
bufferOffset = current();
@@ -423,7 +325,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
try
{
- channel.position(truncateTarget);
+ fchannel.position(truncateTarget);
}
catch (IOException e)
{
@@ -442,7 +344,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
{
try
{
- channel.truncate(toSize);
+ fchannel.truncate(toSize);
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/src/java/org/apache/cassandra/utils/SyncUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/SyncUtil.java b/src/java/org/apache/cassandra/utils/SyncUtil.java
index 0e83ba2..b217e29 100644
--- a/src/java/org/apache/cassandra/utils/SyncUtil.java
+++ b/src/java/org/apache/cassandra/utils/SyncUtil.java
@@ -1,10 +1,6 @@
package org.apache.cassandra.utils;
-import java.io.FileDescriptor;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.io.SyncFailedException;
+import java.io.*;
import java.lang.reflect.Field;
import java.nio.MappedByteBuffer;
import java.nio.channels.ClosedChannelException;
@@ -162,4 +158,20 @@ public class SyncUtil
else
CLibrary.trySync(fd);
}
+
+ public static void trySyncDir(File dir)
+ {
+ if (SKIP_SYNC)
+ return;
+
+ int directoryFD = CLibrary.tryOpenDirectory(dir.getPath());
+ try
+ {
+ trySync(directoryFD);
+ }
+ finally
+ {
+ CLibrary.tryCloseFD(directoryFD);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
index 6a57327..1b94a6b 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
@@ -32,10 +32,7 @@ import org.apache.cassandra.db.compaction.writers.MajorLeveledCompactionWriter;
import org.apache.cassandra.db.compaction.writers.MaxSSTableSizeWriter;
import org.apache.cassandra.db.compaction.writers.SplittingSizeTieredCompactionWriter;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
@@ -183,14 +180,14 @@ public class CompactionAwareWriterTest extends CQLTester
rowsWritten++;
}
}
- Collection<SSTableReader> newSSTables = writer.finish();
+ writer.finish();
return rowsWritten;
}
private void populate(int count) throws Throwable
{
- byte [] payload = new byte[1000];
- new Random().nextBytes(payload);
+ byte [] payload = new byte[5000];
+ new Random(42).nextBytes(payload);
ByteBuffer b = ByteBuffer.wrap(payload);
for (int i = 0; i < count; i++)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
index 28af0ae..ca15722 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@ -94,7 +94,7 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest
byte[] rawPost = new byte[bytesToTest];
try (CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", compressionParameters, sstableMetadataCollector);)
{
- Random r = new Random();
+ Random r = new Random(42);
// Test both write with byte[] and ByteBuffer
r.nextBytes(dataPre);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
index acef1ec..469bccb 100644
--- a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
+++ b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
@@ -7,6 +7,7 @@ import java.io.UTFDataFormatException;
import java.lang.reflect.Field;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.nio.channels.WritableByteChannel;
import java.util.Arrays;
import java.util.Random;
@@ -161,12 +162,14 @@ public class BufferedDataOutputStreamTest
private ByteArrayOutputStream canonical;
private DataOutputStreamPlus dosp;
- void setUp()
+ void setUp() throws Exception
{
generated = new ByteArrayOutputStream();
canonical = new ByteArrayOutputStream();
dosp = new WrappedDataOutputStreamPlus(canonical);
+ if (ndosp != null)
+ ndosp.close();
ndosp = new BufferedDataOutputStreamPlus(adapter, 4096);
}
@@ -535,4 +538,61 @@ public class BufferedDataOutputStreamTest
for (long v : testValues)
assertEquals(v, bbdi.readUnsignedVInt());
}
+
+ @Test
+ public void testWriteSlowByteOrder() throws Exception
+ {
+ try (DataOutputBuffer dob = new DataOutputBuffer(4))
+ {
+ dob.order(ByteOrder.LITTLE_ENDIAN);
+ dob.writeLong(42);
+ assertEquals(42, ByteBuffer.wrap(dob.toByteArray()).order(ByteOrder.LITTLE_ENDIAN).getLong());
+ }
+ }
+
+ @Test
+ public void testWriteExcessSlow() throws Exception
+ {
+ try (DataOutputBuffer dob = new DataOutputBuffer(4))
+ {
+ dob.strictFlushing = true;
+ ByteBuffer buf = ByteBuffer.allocateDirect(8);
+ buf.putLong(0, 42);
+ dob.write(buf);
+ assertEquals(42, ByteBuffer.wrap(dob.toByteArray()).getLong());
+ }
+ }
+
+ @Test
+ public void testApplyToChannel() throws Exception
+ {
+ setUp();
+ Object obj = new Object();
+ Object retval = ndosp.applyToChannel( channel -> {
+ ByteBuffer buf = ByteBuffer.allocate(8);
+ buf.putLong(0, 42);
+ try
+ {
+ channel.write(buf);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ return obj;
+ });
+ assertEquals(obj, retval);
+ assertEquals(42, ByteBuffer.wrap(generated.toByteArray()).getLong());
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testApplyToChannelThrowsForMisaligned() throws Exception
+ {
+ setUp();
+ ndosp.strictFlushing = true;
+ ndosp.applyToChannel( channel -> {
+ return null;
+ });
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
index 3c9aa0e..e051c00 100644
--- a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
+++ b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
@@ -467,6 +467,10 @@ public class BufferedRandomAccessFileTest
}
}, AssertionError.class);
+ //Used to throw ClosedChannelException, but now that it extends BDOSP it just NPEs on the buffer
+ //Writing to a BufferedOutputStream that is closed generates no error
+ //Going to allow the NPE to throw to catch as a bug any use after close. Notably it won't throw NPE for a
+ //write of a 0 length, but that is kind of a corner case
expectException(new Callable<Object>()
{
public Object call() throws IOException
@@ -474,7 +478,7 @@ public class BufferedRandomAccessFileTest
w.write(generateByteArray(1));
return null;
}
- }, ClosedChannelException.class);
+ }, NullPointerException.class);
try (RandomAccessReader copy = RandomAccessReader.open(new File(r.getPath())))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29687a8b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
index e3014c3..fea6d2b 100644
--- a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
@@ -71,7 +71,7 @@ public class CompressedInputStreamTest
for (long l = 0L; l < 1000; l++)
{
index.put(l, writer.getFilePointer());
- writer.stream.writeLong(l);
+ writer.writeLong(l);
}
writer.finish();