You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/07/06 14:38:49 UTC
[2/3] cassandra git commit: Remove DatabaseDescriptor dependency from
SegmentedFile
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/FileHandle.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileHandle.java b/src/java/org/apache/cassandra/io/util/FileHandle.java
new file mode 100644
index 0000000..62fe5e5
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/FileHandle.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.util.Objects;
+import java.util.Optional;
+
+import com.google.common.util.concurrent.RateLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.cache.ChunkCache;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.utils.CLibrary;
+import org.apache.cassandra.utils.concurrent.Ref;
+import org.apache.cassandra.utils.concurrent.RefCounted;
+import org.apache.cassandra.utils.concurrent.SharedCloseableImpl;
+
+import static org.apache.cassandra.utils.Throwables.maybeFail;
+
+/**
+ * {@link FileHandle} provides access to a file for reading, including the ones written by various {@link SequentialWriter}
+ * instances, and it is typically used by {@link org.apache.cassandra.io.sstable.format.SSTableReader}.
+ *
+ * Use {@link FileHandle.Builder} to create an instance, and call {@link #createReader()} (and its variants) to
+ * access the readers for the underlying file.
+ *
+ * You can use {@link Builder#complete()} several times during its lifecycle with different {@code overrideLength}(i.e. early opening file).
+ * For that reason, the builder keeps a reference to the file channel and makes a copy for each {@link Builder#complete()} call.
+ * Therefore, it is important to close the {@link Builder} when it is no longer needed, as well as any {@link FileHandle}
+ * instances.
+ */
+public class FileHandle extends SharedCloseableImpl
+{
+ private static final Logger logger = LoggerFactory.getLogger(FileHandle.class);
+
+ public final ChannelProxy channel;
+
+ public final long onDiskLength;
+
+ /*
+ * Rebufferer factory to use when constructing RandomAccessReaders
+ */
+ private final RebuffererFactory rebuffererFactory;
+
+ /*
+ * Optional CompressionMetadata when dealing with compressed file
+ */
+ private final Optional<CompressionMetadata> compressionMetadata;
+
+ private FileHandle(Cleanup cleanup,
+ ChannelProxy channel,
+ RebuffererFactory rebuffererFactory,
+ CompressionMetadata compressionMetadata,
+ long onDiskLength)
+ {
+ super(cleanup);
+ this.rebuffererFactory = rebuffererFactory;
+ this.channel = channel;
+ this.compressionMetadata = Optional.ofNullable(compressionMetadata);
+ this.onDiskLength = onDiskLength;
+ }
+
+ private FileHandle(FileHandle copy)
+ {
+ super(copy);
+ channel = copy.channel;
+ rebuffererFactory = copy.rebuffererFactory;
+ compressionMetadata = copy.compressionMetadata;
+ onDiskLength = copy.onDiskLength;
+ }
+
+ /**
+ * @return Path to the file this factory is referencing
+ */
+ public String path()
+ {
+ return channel.filePath();
+ }
+
+ public long dataLength()
+ {
+ return compressionMetadata.map(c -> c.dataLength).orElseGet(rebuffererFactory::fileLength);
+ }
+
+ public RebuffererFactory rebuffererFactory()
+ {
+ return rebuffererFactory;
+ }
+
+ public Optional<CompressionMetadata> compressionMetadata()
+ {
+ return compressionMetadata;
+ }
+
+ @Override
+ public void addTo(Ref.IdentityCollection identities)
+ {
+ super.addTo(identities);
+ compressionMetadata.ifPresent(metadata -> metadata.addTo(identities));
+ }
+
+ @Override
+ public FileHandle sharedCopy()
+ {
+ return new FileHandle(this);
+ }
+
+ /**
+ * Create {@link RandomAccessReader} with configured method of reading content of the file.
+ *
+ * @return RandomAccessReader for the file
+ */
+ public RandomAccessReader createReader()
+ {
+ return createReader(null);
+ }
+
+ /**
+ * Create {@link RandomAccessReader} with configured method of reading content of the file.
+ * Reading from file will be rate limited by given {@link RateLimiter}.
+ *
+ * @param limiter RateLimiter to use for rate limiting read
+ * @return RandomAccessReader for the file
+ */
+ public RandomAccessReader createReader(RateLimiter limiter)
+ {
+ return new RandomAccessReader(instantiateRebufferer(limiter));
+ }
+
+ public FileDataInput createReader(long position)
+ {
+ RandomAccessReader reader = createReader();
+ reader.seek(position);
+ return reader;
+ }
+
+ /**
+ * Drop page cache from start to given {@code before}.
+ *
+ * @param before uncompressed position from start of the file to be dropped from cache. if 0, to end of file.
+ */
+ public void dropPageCache(long before)
+ {
+ long position = compressionMetadata.map(metadata -> {
+ if (before >= metadata.dataLength)
+ return 0L;
+ else
+ return metadata.chunkFor(before).offset;
+ }).orElse(before);
+ CLibrary.trySkipCache(channel.getFileDescriptor(), 0, position, path());
+ }
+
+ private Rebufferer instantiateRebufferer(RateLimiter limiter)
+ {
+ Rebufferer rebufferer = rebuffererFactory.instantiateRebufferer();
+
+ if (limiter != null)
+ rebufferer = new LimitingRebufferer(rebufferer, limiter, DiskOptimizationStrategy.MAX_BUFFER_SIZE);
+ return rebufferer;
+ }
+
+ /**
+ * Perform clean up of all resources held by {@link FileHandle}.
+ */
+ private static class Cleanup implements RefCounted.Tidy
+ {
+ final ChannelProxy channel;
+ final RebuffererFactory rebufferer;
+ final CompressionMetadata compressionMetadata;
+ final Optional<ChunkCache> chunkCache;
+
+ private Cleanup(ChannelProxy channel,
+ RebuffererFactory rebufferer,
+ CompressionMetadata compressionMetadata,
+ ChunkCache chunkCache)
+ {
+ this.channel = channel;
+ this.rebufferer = rebufferer;
+ this.compressionMetadata = compressionMetadata;
+ this.chunkCache = Optional.ofNullable(chunkCache);
+ }
+
+ public String name()
+ {
+ return channel.filePath();
+ }
+
+ public void tidy()
+ {
+ chunkCache.ifPresent(cache -> cache.invalidateFile(name()));
+ try
+ {
+ if (compressionMetadata != null)
+ {
+ compressionMetadata.close();
+ }
+ }
+ finally
+ {
+ try
+ {
+ channel.close();
+ }
+ finally
+ {
+ rebufferer.close();
+ }
+ }
+ }
+ }
+
+ /**
+ * Configures how the file will be read (compressed, mmapped, use cache etc.)
+ */
+ public static class Builder implements AutoCloseable
+ {
+ private final String path;
+
+ private ChannelProxy channel;
+ private CompressionMetadata compressionMetadata;
+ private MmappedRegions regions;
+ private ChunkCache chunkCache;
+ private int bufferSize = RandomAccessReader.DEFAULT_BUFFER_SIZE;
+ private BufferType bufferType = BufferType.OFF_HEAP;
+
+ private boolean mmapped = false;
+ private boolean compressed = false;
+
+ public Builder(String path)
+ {
+ this.path = path;
+ }
+
+ public Builder(ChannelProxy channel)
+ {
+ this.channel = channel;
+ this.path = channel.filePath();
+ }
+
+ public Builder compressed(boolean compressed)
+ {
+ this.compressed = compressed;
+ return this;
+ }
+
+ /**
+ * Set {@link ChunkCache} to use.
+ *
+ * @param chunkCache ChunkCache object to use for caching
+ * @return this object
+ */
+ public Builder withChunkCache(ChunkCache chunkCache)
+ {
+ this.chunkCache = chunkCache;
+ return this;
+ }
+
+ /**
+ * Provide {@link CompressionMetadata} to use when reading compressed file.
+ *
+ * @param metadata CompressionMetadata to use
+ * @return this object
+ */
+ public Builder withCompressionMetadata(CompressionMetadata metadata)
+ {
+ this.compressed = Objects.nonNull(metadata);
+ this.compressionMetadata = metadata;
+ return this;
+ }
+
+ /**
+ * Set whether to use mmap for reading
+ *
+ * @param mmapped true if using mmap
+ * @return this instance
+ */
+ public Builder mmapped(boolean mmapped)
+ {
+ this.mmapped = mmapped;
+ return this;
+ }
+
+ /**
+ * Set the buffer size to use (if appropriate).
+ *
+ * @param bufferSize Buffer size in bytes
+ * @return this instance
+ */
+ public Builder bufferSize(int bufferSize)
+ {
+ this.bufferSize = bufferSize;
+ return this;
+ }
+
+ /**
+ * Set the buffer type (on heap or off heap) to use (if appropriate).
+ *
+ * @param bufferType Buffer type to use
+ * @return this instance
+ */
+ public Builder bufferType(BufferType bufferType)
+ {
+ this.bufferType = bufferType;
+ return this;
+ }
+
+ /**
+ * Complete building {@link FileHandle} without overriding file length.
+ *
+ * @see #complete(long)
+ */
+ public FileHandle complete()
+ {
+ return complete(-1L);
+ }
+
+ /**
+ * Complete building {@link FileHandle} with the given length, which overrides the file length.
+ *
+ * @param overrideLength Override file length (in bytes) so that read cannot go further than this value.
+ * If the value is less than or equal to 0, then the value is ignored.
+ * @return Built file
+ */
+ @SuppressWarnings("resource")
+ public FileHandle complete(long overrideLength)
+ {
+ if (channel == null)
+ {
+ channel = new ChannelProxy(path);
+ }
+
+ ChannelProxy channelCopy = channel.sharedCopy();
+ try
+ {
+ if (compressed && compressionMetadata == null)
+ compressionMetadata = CompressionMetadata.create(channelCopy.filePath());
+
+ long length = overrideLength > 0 ? overrideLength : compressed ? compressionMetadata.compressedFileLength : channelCopy.size();
+
+ RebuffererFactory rebuffererFactory;
+ if (mmapped)
+ {
+ if (compressed)
+ {
+ regions = MmappedRegions.map(channelCopy, compressionMetadata);
+ rebuffererFactory = maybeCached(new CompressedChunkReader.Mmap(channelCopy, compressionMetadata,
+ regions));
+ }
+ else
+ {
+ updateRegions(channelCopy, length);
+ rebuffererFactory = new MmapRebufferer(channelCopy, length, regions.sharedCopy());
+ }
+ }
+ else
+ {
+ regions = null;
+ if (compressed)
+ {
+ rebuffererFactory = maybeCached(new CompressedChunkReader.Standard(channelCopy, compressionMetadata));
+ }
+ else
+ {
+ rebuffererFactory = maybeCached(new SimpleChunkReader(channelCopy, length, bufferType, bufferSize));
+ }
+ }
+ Cleanup cleanup = new Cleanup(channelCopy, rebuffererFactory, compressionMetadata, chunkCache);
+ return new FileHandle(cleanup, channelCopy, rebuffererFactory, compressionMetadata, length);
+ }
+ catch (Throwable t)
+ {
+ channelCopy.close();
+ throw t;
+ }
+ }
+
+ public Throwable close(Throwable accumulate)
+ {
+ if (!compressed && regions != null)
+ accumulate = regions.close(accumulate);
+ if (channel != null)
+ return channel.close(accumulate);
+
+ return accumulate;
+ }
+
+ public void close()
+ {
+ maybeFail(close(null));
+ }
+
+ private RebuffererFactory maybeCached(ChunkReader reader)
+ {
+ if (chunkCache != null && chunkCache.capacity() > 0)
+ return chunkCache.wrap(reader);
+ return reader;
+ }
+
+ private void updateRegions(ChannelProxy channel, long length)
+ {
+ if (regions != null && !regions.isValid(channel))
+ {
+ Throwable err = regions.close(null);
+ if (err != null)
+ logger.error("Failed to close mapped regions", err);
+
+ regions = null;
+ }
+
+ if (regions == null)
+ regions = MmappedRegions.map(channel, length);
+ else
+ regions.extend(length);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "(path='" + path() + '\'' +
+ ", length=" + rebuffererFactory.fileLength() +
+ ')';
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/ICompressedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ICompressedFile.java b/src/java/org/apache/cassandra/io/util/ICompressedFile.java
deleted file mode 100644
index e69487c..0000000
--- a/src/java/org/apache/cassandra/io/util/ICompressedFile.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-import org.apache.cassandra.io.compress.CompressionMetadata;
-
-public interface ICompressedFile
-{
- ChannelProxy channel();
- CompressionMetadata getMetadata();
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/MmapRebufferer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmapRebufferer.java b/src/java/org/apache/cassandra/io/util/MmapRebufferer.java
index 9d79919..8df6370 100644
--- a/src/java/org/apache/cassandra/io/util/MmapRebufferer.java
+++ b/src/java/org/apache/cassandra/io/util/MmapRebufferer.java
@@ -28,7 +28,7 @@ class MmapRebufferer extends AbstractReaderFileProxy implements Rebufferer, Rebu
{
protected final MmappedRegions regions;
- public MmapRebufferer(ChannelProxy channel, long fileLength, MmappedRegions regions)
+ MmapRebufferer(ChannelProxy channel, long fileLength, MmappedRegions regions)
{
super(channel, fileLength);
this.regions = regions;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/MmappedRegions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedRegions.java b/src/java/org/apache/cassandra/io/util/MmappedRegions.java
index f269b84..9ab8abf 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedRegions.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedRegions.java
@@ -91,6 +91,11 @@ public class MmappedRegions extends SharedCloseableImpl
return new MmappedRegions(channel, null, 0);
}
+ /**
+ * @param channel file to map. the MmappedRegions instance will hold shared copy of given channel.
+ * @param metadata
+ * @return new instance
+ */
public static MmappedRegions map(ChannelProxy channel, CompressionMetadata metadata)
{
if (metadata == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
deleted file mode 100644
index d514bf8..0000000
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-import java.io.*;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.io.sstable.format.Version;
-
-public class MmappedSegmentedFile extends SegmentedFile
-{
- private static final Logger logger = LoggerFactory.getLogger(MmappedSegmentedFile.class);
-
- public MmappedSegmentedFile(ChannelProxy channel, long length, MmappedRegions regions)
- {
- this(channel, new MmapRebufferer(channel, length, regions), length);
- }
-
- public MmappedSegmentedFile(ChannelProxy channel, RebuffererFactory rebufferer, long length)
- {
- super(new Cleanup(channel, rebufferer), channel, rebufferer, length);
- }
-
- private MmappedSegmentedFile(MmappedSegmentedFile copy)
- {
- super(copy);
- }
-
- public MmappedSegmentedFile sharedCopy()
- {
- return new MmappedSegmentedFile(this);
- }
-
- /**
- * Overrides the default behaviour to create segments of a maximum size.
- */
- static class Builder extends SegmentedFile.Builder
- {
- private MmappedRegions regions;
-
- Builder()
- {
- super();
- }
-
- public SegmentedFile complete(ChannelProxy channel, int bufferSize, long overrideLength)
- {
- long length = overrideLength > 0 ? overrideLength : channel.size();
- updateRegions(channel, length);
-
- return new MmappedSegmentedFile(channel, length, regions.sharedCopy());
- }
-
- private void updateRegions(ChannelProxy channel, long length)
- {
- if (regions != null && !regions.isValid(channel))
- {
- Throwable err = regions.close(null);
- if (err != null)
- logger.error("Failed to close mapped regions", err);
-
- regions = null;
- }
-
- if (regions == null)
- regions = MmappedRegions.map(channel, length);
- else
- regions.extend(length);
- }
-
- @Override
- public void serializeBounds(DataOutput out, Version version) throws IOException
- {
- if (!version.hasBoundaries())
- return;
-
- super.serializeBounds(out, version);
- out.writeInt(0);
- }
-
- @Override
- public void deserializeBounds(DataInput in, Version version) throws IOException
- {
- if (!version.hasBoundaries())
- return;
-
- super.deserializeBounds(in, version);
- in.skipBytes(in.readInt() * TypeSizes.sizeof(0L));
- }
-
- @Override
- public Throwable close(Throwable accumulate)
- {
- return super.close(regions == null
- ? accumulate
- : regions.close(accumulate));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 725b367..5157eac 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -17,50 +17,37 @@
*/
package org.apache.cassandra.io.util;
-import java.io.*;
-import java.nio.ByteBuffer;
+import java.io.File;
+import java.io.IOException;
import java.nio.ByteOrder;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Ints;
-import com.google.common.util.concurrent.RateLimiter;
import org.apache.cassandra.io.compress.BufferType;
-import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.util.Rebufferer.BufferHolder;
-import org.apache.cassandra.utils.memory.BufferPool;
public class RandomAccessReader extends RebufferingInputStream implements FileDataInput
{
// The default buffer size when the client doesn't specify it
public static final int DEFAULT_BUFFER_SIZE = 4096;
- // The maximum buffer size, we will never buffer more than this size. Further,
- // when the limiter is not null, i.e. when throttling is enabled, we read exactly
- // this size, since when throttling the intention is to eventually read everything,
- // see CASSANDRA-8630
- // NOTE: this size is chosen both for historical consistency, as a reasonable upper bound,
- // and because our BufferPool currently has a maximum allocation size of this.
- public static final int MAX_BUFFER_SIZE = 1 << 16; // 64k
-
// offset of the last file mark
- protected long markedPointer;
+ private long markedPointer;
- @VisibleForTesting
final Rebufferer rebufferer;
- BufferHolder bufferHolder = Rebufferer.EMPTY;
+ private BufferHolder bufferHolder = Rebufferer.EMPTY;
- protected RandomAccessReader(Rebufferer rebufferer)
+ /**
+ * Only created through Builder
+ *
+ * @param rebufferer Rebufferer to use
+ */
+ RandomAccessReader(Rebufferer rebufferer)
{
super(Rebufferer.EMPTY.buffer());
this.rebufferer = rebufferer;
}
- public static ByteBuffer allocateBuffer(int size, BufferType bufferType)
- {
- return BufferPool.get(size, bufferType).order(ByteOrder.BIG_ENDIAN);
- }
-
/**
* Read data from file starting from current currentOffset to populate buffer.
*/
@@ -72,7 +59,7 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa
reBufferAt(current());
}
- public void reBufferAt(long position)
+ private void reBufferAt(long position)
{
bufferHolder.release();
bufferHolder = rebufferer.rebuffer(position);
@@ -188,11 +175,11 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa
/**
* Class to hold a mark to the position of the file
*/
- protected static class BufferedRandomAccessFileMark implements DataPosition
+ private static class BufferedRandomAccessFileMark implements DataPosition
{
final long pointer;
- public BufferedRandomAccessFileMark(long pointer)
+ private BufferedRandomAccessFileMark(long pointer)
{
this.pointer = pointer;
}
@@ -283,139 +270,14 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa
return rebufferer.getCrcCheckChance();
}
- protected static Rebufferer instantiateRebufferer(RebuffererFactory fileRebufferer, RateLimiter limiter)
- {
- Rebufferer rebufferer = fileRebufferer.instantiateRebufferer();
-
- if (limiter != null)
- rebufferer = new LimitingRebufferer(rebufferer, limiter, MAX_BUFFER_SIZE);
-
- return rebufferer;
- }
-
- public static RandomAccessReader build(SegmentedFile file, RateLimiter limiter)
- {
- return new RandomAccessReader(instantiateRebufferer(file.rebuffererFactory(), limiter));
- }
-
- public static Builder builder(ChannelProxy channel)
- {
- return new Builder(channel);
- }
-
- public static class Builder
- {
- // The NIO file channel or an empty channel
- public final ChannelProxy channel;
-
- // The size of the buffer for buffered readers
- protected int bufferSize;
-
- // The type of the buffer for buffered readers
- public BufferType bufferType;
-
- // The buffer
- public ByteBuffer buffer;
-
- // An optional limiter that will throttle the amount of data we read
- public RateLimiter limiter;
-
- // The mmap segments for mmap readers
- public MmappedRegions regions;
-
- // Compression for compressed readers
- public CompressionMetadata compression;
-
- public Builder(ChannelProxy channel)
- {
- this.channel = channel;
- this.bufferSize = DEFAULT_BUFFER_SIZE;
- this.bufferType = BufferType.OFF_HEAP;
- }
-
- /** The buffer size is typically already page aligned but if that is not the case
- * make sure that it is a multiple of the page size, 4096. Also limit it to the maximum
- * buffer size unless we are throttling, in which case we may as well read the maximum
- * directly since the intention is to read the full file, see CASSANDRA-8630.
- * */
- private int adjustedBufferSize()
- {
- if (limiter != null)
- return MAX_BUFFER_SIZE;
-
- // should already be a page size multiple but if that's not case round it up
- int wholePageSize = (bufferSize + 4095) & ~4095;
- return Math.min(MAX_BUFFER_SIZE, wholePageSize);
- }
-
- protected Rebufferer createRebufferer()
- {
- return instantiateRebufferer(chunkReader(), limiter);
- }
-
- public RebuffererFactory chunkReader()
- {
- if (compression != null)
- return CompressedSegmentedFile.chunkReader(channel, compression, regions);
- if (regions != null)
- return new MmapRebufferer(channel, -1, regions);
-
- int adjustedSize = adjustedBufferSize();
- return new SimpleChunkReader(channel, -1, bufferType, adjustedSize);
- }
-
- public Builder bufferSize(int bufferSize)
- {
- if (bufferSize <= 0)
- throw new IllegalArgumentException("bufferSize must be positive");
-
- this.bufferSize = bufferSize;
- return this;
- }
-
- public Builder bufferType(BufferType bufferType)
- {
- this.bufferType = bufferType;
- return this;
- }
-
- public Builder regions(MmappedRegions regions)
- {
- this.regions = regions;
- return this;
- }
-
- public Builder compression(CompressionMetadata metadata)
- {
- this.compression = metadata;
- return this;
- }
-
- public Builder limiter(RateLimiter limiter)
- {
- this.limiter = limiter;
- return this;
- }
-
- public RandomAccessReader build()
- {
- return new RandomAccessReader(createRebufferer());
- }
-
- public RandomAccessReader buildWithChannel()
- {
- return new RandomAccessReaderWithOwnChannel(createRebufferer());
- }
- }
-
// A wrapper of the RandomAccessReader that closes the channel when done.
// For performance reasons RAR does not increase the reference count of
// a channel but assumes the owner will keep it open and close it,
// see CASSANDRA-9379, this thin class is just for those cases where we do
// not have a shared channel.
- public static class RandomAccessReaderWithOwnChannel extends RandomAccessReader
+ static class RandomAccessReaderWithOwnChannel extends RandomAccessReader
{
- protected RandomAccessReaderWithOwnChannel(Rebufferer rebufferer)
+ RandomAccessReaderWithOwnChannel(Rebufferer rebufferer)
{
super(rebufferer);
}
@@ -441,14 +303,26 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa
}
}
+ /**
+ * Open a RandomAccessReader (not compressed, not mmapped, no read throttling) that will own its channel.
+ *
+ * @param file File to open for reading
+ * @return new RandomAccessReader that owns the channel opened in this method.
+ */
@SuppressWarnings("resource")
public static RandomAccessReader open(File file)
{
- return new Builder(new ChannelProxy(file)).buildWithChannel();
- }
-
- public static RandomAccessReader open(ChannelProxy channel)
- {
- return new Builder(channel).build();
+ ChannelProxy channel = new ChannelProxy(file);
+ try
+ {
+ ChunkReader reader = new SimpleChunkReader(channel, -1, BufferType.OFF_HEAP, DEFAULT_BUFFER_SIZE);
+ Rebufferer rebufferer = reader.instantiateRebufferer();
+ return new RandomAccessReaderWithOwnChannel(rebufferer);
+ }
+ catch (Throwable t)
+ {
+ channel.close();
+ throw t;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/Rebufferer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/Rebufferer.java b/src/java/org/apache/cassandra/io/util/Rebufferer.java
index e88c7cb..2fc7ffa 100644
--- a/src/java/org/apache/cassandra/io/util/Rebufferer.java
+++ b/src/java/org/apache/cassandra/io/util/Rebufferer.java
@@ -38,7 +38,7 @@ public interface Rebufferer extends ReaderFileProxy
*/
void closeReader();
- public interface BufferHolder
+ interface BufferHolder
{
/**
* Returns a useable buffer (i.e. one whose position and limit can be freely modified). Its limit will be set
@@ -59,7 +59,7 @@ public interface Rebufferer extends ReaderFileProxy
void release();
}
- static final BufferHolder EMPTY = new BufferHolder()
+ BufferHolder EMPTY = new BufferHolder()
{
final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
deleted file mode 100644
index 62e14ba..0000000
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ /dev/null
@@ -1,318 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.File;
-import java.io.IOException;
-
-import com.google.common.util.concurrent.RateLimiter;
-
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.compress.CompressedSequentialWriter;
-import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.IndexSummary;
-import org.apache.cassandra.io.sstable.IndexSummaryBuilder;
-import org.apache.cassandra.io.sstable.format.Version;
-import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
-import org.apache.cassandra.utils.CLibrary;
-import org.apache.cassandra.utils.concurrent.RefCounted;
-import org.apache.cassandra.utils.concurrent.SharedCloseableImpl;
-
-import static org.apache.cassandra.utils.Throwables.maybeFail;
-
-/**
- * Abstracts a read-only file that has been split into segments, each of which can be represented by an independent
- * FileDataInput. Allows for iteration over the FileDataInputs, or random access to the FileDataInput for a given
- * position.
- *
- * The JVM can only map up to 2GB at a time, so each segment is at most that size when using mmap i/o. If a segment
- * would need to be longer than 2GB, that segment will not be mmap'd, and a new RandomAccessFile will be created for
- * each access to that segment.
- */
-public abstract class SegmentedFile extends SharedCloseableImpl
-{
- public final ChannelProxy channel;
-
- // This differs from length for compressed files (but we still need length for
- // SegmentIterator because offsets in the file are relative to the uncompressed size)
- public final long onDiskLength;
-
- /**
- * Rebufferer to use to construct RandomAccessReaders.
- */
- private final RebuffererFactory rebufferer;
-
- protected SegmentedFile(Cleanup cleanup, ChannelProxy channel, RebuffererFactory rebufferer, long onDiskLength)
- {
- super(cleanup);
- this.rebufferer = rebufferer;
- this.channel = channel;
- this.onDiskLength = onDiskLength;
- }
-
- protected SegmentedFile(SegmentedFile copy)
- {
- super(copy);
- channel = copy.channel;
- rebufferer = copy.rebufferer;
- onDiskLength = copy.onDiskLength;
- }
-
- public String path()
- {
- return channel.filePath();
- }
-
- public long dataLength()
- {
- return rebufferer.fileLength();
- }
-
- public RebuffererFactory rebuffererFactory()
- {
- return rebufferer;
- }
-
- protected static class Cleanup implements RefCounted.Tidy
- {
- final ChannelProxy channel;
- final ReaderFileProxy rebufferer;
- protected Cleanup(ChannelProxy channel, ReaderFileProxy rebufferer)
- {
- this.channel = channel;
- this.rebufferer = rebufferer;
- }
-
- public String name()
- {
- return channel.filePath();
- }
-
- public void tidy()
- {
- try
- {
- channel.close();
- }
- finally
- {
- rebufferer.close();
- }
- }
- }
-
- public abstract SegmentedFile sharedCopy();
-
- public RandomAccessReader createReader()
- {
- return RandomAccessReader.build(this, null);
- }
-
- public RandomAccessReader createReader(RateLimiter limiter)
- {
- return RandomAccessReader.build(this, limiter);
- }
-
- public FileDataInput createReader(long position)
- {
- RandomAccessReader reader = createReader();
- reader.seek(position);
- return reader;
- }
-
- public void dropPageCache(long before)
- {
- CLibrary.trySkipCache(channel.getFileDescriptor(), 0, before, path());
- }
-
- /**
- * @return A SegmentedFile.Builder.
- */
- public static Builder getBuilder(Config.DiskAccessMode mode, boolean compressed)
- {
- return compressed ? new CompressedSegmentedFile.Builder(null)
- : mode == Config.DiskAccessMode.mmap ? new MmappedSegmentedFile.Builder()
- : new BufferedSegmentedFile.Builder();
- }
-
- public static Builder getCompressedBuilder(CompressedSequentialWriter writer)
- {
- return new CompressedSegmentedFile.Builder(writer);
- }
-
- /**
- * Collects potential segmentation points in an underlying file, and builds a SegmentedFile to represent it.
- */
- public static abstract class Builder implements AutoCloseable
- {
- private ChannelProxy channel;
-
- /**
- * Called after all potential boundaries have been added to apply this Builder to a concrete file on disk.
- * @param channel The channel to the file on disk.
- */
- protected abstract SegmentedFile complete(ChannelProxy channel, int bufferSize, long overrideLength);
-
- @SuppressWarnings("resource") // SegmentedFile owns channel
- private SegmentedFile complete(String path, int bufferSize, long overrideLength)
- {
- ChannelProxy channelCopy = getChannel(path);
- try
- {
- return complete(channelCopy, bufferSize, overrideLength);
- }
- catch (Throwable t)
- {
- channelCopy.close();
- throw t;
- }
- }
-
- public SegmentedFile buildData(Descriptor desc, StatsMetadata stats, IndexSummaryBuilder.ReadableBoundary boundary)
- {
- return complete(desc.filenameFor(Component.DATA), bufferSize(stats), boundary.dataLength);
- }
-
- public SegmentedFile buildData(Descriptor desc, StatsMetadata stats)
- {
- return complete(desc.filenameFor(Component.DATA), bufferSize(stats), -1L);
- }
-
- public SegmentedFile buildIndex(Descriptor desc, IndexSummary indexSummary, IndexSummaryBuilder.ReadableBoundary boundary)
- {
- return complete(desc.filenameFor(Component.PRIMARY_INDEX), bufferSize(desc, indexSummary), boundary.indexLength);
- }
-
- public SegmentedFile buildIndex(Descriptor desc, IndexSummary indexSummary)
- {
- return complete(desc.filenameFor(Component.PRIMARY_INDEX), bufferSize(desc, indexSummary), -1L);
- }
-
- private static int bufferSize(StatsMetadata stats)
- {
- return bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
- }
-
- private static int bufferSize(Descriptor desc, IndexSummary indexSummary)
- {
- File file = new File(desc.filenameFor(Component.PRIMARY_INDEX));
- return bufferSize(file.length() / indexSummary.size());
- }
-
- /**
- Return the buffer size for a given record size. For spinning disks always add one page.
- For solid state disks only add one page if the chance of crossing to the next page is more
- than a predifined value, @see Config.disk_optimization_page_cross_chance.
- */
- static int bufferSize(long recordSize)
- {
- Config.DiskOptimizationStrategy strategy = DatabaseDescriptor.getDiskOptimizationStrategy();
- if (strategy == Config.DiskOptimizationStrategy.ssd)
- {
- // The crossing probability is calculated assuming a uniform distribution of record
- // start position in a page, so it's the record size modulo the page size divided by
- // the total page size.
- double pageCrossProbability = (recordSize % 4096) / 4096.;
- // if the page cross probability is equal or bigger than disk_optimization_page_cross_chance we add one page
- if ((pageCrossProbability - DatabaseDescriptor.getDiskOptimizationPageCrossChance()) > -1e-16)
- recordSize += 4096;
-
- return roundBufferSize(recordSize);
- }
- else if (strategy == Config.DiskOptimizationStrategy.spinning)
- {
- return roundBufferSize(recordSize + 4096);
- }
- else
- {
- throw new IllegalStateException("Unsupported disk optimization strategy: " + strategy);
- }
- }
-
- /**
- Round up to the next multiple of 4k but no more than 64k
- */
- static int roundBufferSize(long size)
- {
- if (size <= 0)
- return 4096;
-
- size = (size + 4095) & ~4095;
- return (int)Math.min(size, 1 << 16);
- }
-
- public void serializeBounds(DataOutput out, Version version) throws IOException
- {
- if (!version.hasBoundaries())
- return;
-
- out.writeUTF(DatabaseDescriptor.getDiskAccessMode().name());
- }
-
- public void deserializeBounds(DataInput in, Version version) throws IOException
- {
- if (!version.hasBoundaries())
- return;
-
- if (!in.readUTF().equals(DatabaseDescriptor.getDiskAccessMode().name()))
- throw new IOException("Cannot deserialize SSTable Summary component because the DiskAccessMode was changed!");
- }
-
- public Throwable close(Throwable accumulate)
- {
- if (channel != null)
- return channel.close(accumulate);
-
- return accumulate;
- }
-
- public void close()
- {
- maybeFail(close(null));
- }
-
- private ChannelProxy getChannel(String path)
- {
- if (channel != null)
- {
- // This is really fragile, both path and channel.filePath()
- // must agree, i.e. they both must be absolute or both relative
- // eventually we should really pass the filePath to the builder
- // constructor and remove this
- if (channel.filePath().equals(path))
- return channel.sharedCopy();
- else
- channel.close();
- }
-
- channel = new ChannelProxy(path);
- return channel.sharedCopy();
- }
- }
-
- @Override
- public String toString() {
- return getClass().getSimpleName() + "(path='" + path() + '\'' +
- ", length=" + rebufferer.fileLength() +
- ')';
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/SimpleChunkReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SimpleChunkReader.java b/src/java/org/apache/cassandra/io/util/SimpleChunkReader.java
index 7bfb57b..bc1a529 100644
--- a/src/java/org/apache/cassandra/io/util/SimpleChunkReader.java
+++ b/src/java/org/apache/cassandra/io/util/SimpleChunkReader.java
@@ -27,7 +27,7 @@ class SimpleChunkReader extends AbstractReaderFileProxy implements ChunkReader
private final int bufferSize;
private final BufferType bufferType;
- public SimpleChunkReader(ChannelProxy channel, long fileLength, BufferType bufferType, int bufferSize)
+ SimpleChunkReader(ChannelProxy channel, long fileLength, BufferType bufferType, int bufferSize)
{
super(channel, fileLength);
this.bufferSize = bufferSize;
@@ -55,15 +55,9 @@ class SimpleChunkReader extends AbstractReaderFileProxy implements ChunkReader
}
@Override
- public boolean alignmentRequired()
- {
- return false;
- }
-
- @Override
public Rebufferer instantiateRebufferer()
{
- return BufferManagingRebufferer.on(this);
+ return new BufferManagingRebufferer.Unaligned(this);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/SpinningDiskOptimizationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SpinningDiskOptimizationStrategy.java b/src/java/org/apache/cassandra/io/util/SpinningDiskOptimizationStrategy.java
new file mode 100644
index 0000000..5cec282
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/SpinningDiskOptimizationStrategy.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+public class SpinningDiskOptimizationStrategy implements DiskOptimizationStrategy
+{
+ /**
+ * For spinning disks always add one page.
+ */
+ @Override
+ public int bufferSize(long recordSize)
+ {
+ return roundBufferSize(recordSize + 4096);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/SsdDiskOptimizationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SsdDiskOptimizationStrategy.java b/src/java/org/apache/cassandra/io/util/SsdDiskOptimizationStrategy.java
new file mode 100644
index 0000000..032ec2b
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/SsdDiskOptimizationStrategy.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+public class SsdDiskOptimizationStrategy implements DiskOptimizationStrategy
+{
+ private final double diskOptimizationPageCrossChance;
+
+ public SsdDiskOptimizationStrategy(double diskOptimizationPageCrossChance)
+ {
+ this.diskOptimizationPageCrossChance = diskOptimizationPageCrossChance;
+ }
+
+ /**
+ * For solid state disks only add one page if the chance of crossing to the next page is more
+ * than a predifined value.
+ *
+ * @see org.apache.cassandra.config.Config#disk_optimization_page_cross_chance
+ */
+ @Override
+ public int bufferSize(long recordSize)
+ {
+ // The crossing probability is calculated assuming a uniform distribution of record
+ // start position in a page, so it's the record size modulo the page size divided by
+ // the total page size.
+ double pageCrossProbability = (recordSize % 4096) / 4096.;
+ // if the page cross probability is equal or bigger than disk_optimization_page_cross_chance we add one page
+ if ((pageCrossProbability - diskOptimizationPageCrossChance) > -1e-16)
+ recordSize += 4096;
+
+ return roundBufferSize(recordSize);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/test/unit/org/apache/cassandra/MockSchema.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/MockSchema.java b/test/unit/org/apache/cassandra/MockSchema.java
index a406290..7f9de55 100644
--- a/test/unit/org/apache/cassandra/MockSchema.java
+++ b/test/unit/org/apache/cassandra/MockSchema.java
@@ -34,16 +34,14 @@ import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.IndexSummary;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.sstable.metadata.MetadataType;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
-import org.apache.cassandra.io.util.BufferedSegmentedFile;
-import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.Memory;
-import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.io.util.FileHandle;
import org.apache.cassandra.schema.CachingParams;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.KeyspaceParams;
@@ -62,7 +60,7 @@ public class MockSchema
public static final Keyspace ks = Keyspace.mockKS(KeyspaceMetadata.create("mockks", KeyspaceParams.simpleTransient(1)));
public static final IndexSummary indexSummary;
- private static final SegmentedFile segmentedFile = new BufferedSegmentedFile(new ChannelProxy(temp("mocksegmentedfile")), RandomAccessReader.DEFAULT_BUFFER_SIZE, 0);
+ private static final FileHandle RANDOM_ACCESS_READER_FACTORY = new FileHandle.Builder(temp("mocksegmentedfile").getAbsolutePath()).complete();
public static Memtable memtable(ColumnFamilyStore cfs)
{
@@ -89,7 +87,7 @@ public class MockSchema
Descriptor descriptor = new Descriptor(cfs.getDirectories().getDirectoryForNewSSTables(),
cfs.keyspace.getName(),
cfs.getColumnFamilyName(),
- generation);
+ generation, SSTableFormat.Type.BIG);
Set<Component> components = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.TOC);
for (Component component : components)
{
@@ -122,7 +120,7 @@ public class MockSchema
.finalizeMetadata(cfs.metadata.partitioner.getClass().getCanonicalName(), 0.01f, -1, header)
.get(MetadataType.STATS);
SSTableReader reader = SSTableReader.internalOpen(descriptor, components, cfs.metadata,
- segmentedFile.sharedCopy(), segmentedFile.sharedCopy(), indexSummary.sharedCopy(),
+ RANDOM_ACCESS_READER_FACTORY.sharedCopy(), RANDOM_ACCESS_READER_FACTORY.sharedCopy(), indexSummary.sharedCopy(),
new AlwaysPresentFilter(), 1L, metadata, SSTableReader.OpenReason.NORMAL, header);
reader.first = reader.last = readerBounds(generation);
if (!keepRef)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index af43152..5d01886 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -43,6 +43,7 @@ import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.metrics.ClearableHistogram;
import org.apache.cassandra.schema.KeyspaceParams;
@@ -350,8 +351,9 @@ public class ColumnFamilyStoreTest
for (int version = 1; version <= 2; ++version)
{
- Descriptor existing = new Descriptor(cfs.getDirectories().getDirectoryForNewSSTables(), KEYSPACE2, CF_STANDARD1, version);
- Descriptor desc = new Descriptor(Directories.getBackupsDirectory(existing), KEYSPACE2, CF_STANDARD1, version);
+ Descriptor existing = new Descriptor(cfs.getDirectories().getDirectoryForNewSSTables(), KEYSPACE2, CF_STANDARD1, version,
+ SSTableFormat.Type.BIG);
+ Descriptor desc = new Descriptor(Directories.getBackupsDirectory(existing), KEYSPACE2, CF_STANDARD1, version, SSTableFormat.Type.BIG);
for (Component c : new Component[]{ Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.STATS })
assertTrue("Cannot find backed-up file:" + desc.filenameFor(c), new File(desc.filenameFor(c)).exists());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/test/unit/org/apache/cassandra/db/DirectoriesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
index f8d01a8..9a1c0bd 100644
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.index.internal.CassandraIndex;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.service.DefaultFSErrorHandler;
@@ -117,7 +118,7 @@ public class DirectoriesTest
private static void createFakeSSTable(File dir, String cf, int gen, List<File> addTo) throws IOException
{
- Descriptor desc = new Descriptor(dir, KS, cf, gen);
+ Descriptor desc = new Descriptor(dir, KS, cf, gen, SSTableFormat.Type.BIG);
for (Component c : new Component[]{ Component.DATA, Component.PRIMARY_INDEX, Component.FILTER })
{
File f = new File(desc.filenameFor(c));
@@ -152,7 +153,7 @@ public class DirectoriesTest
Directories directories = new Directories(cfm);
assertEquals(cfDir(cfm), directories.getDirectoryForNewSSTables());
- Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1);
+ Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1, SSTableFormat.Type.BIG);
File snapshotDir = new File(cfDir(cfm), File.separator + Directories.SNAPSHOT_SUBDIR + File.separator + "42");
assertEquals(snapshotDir, Directories.getSnapshotDirectory(desc, "42"));
@@ -186,8 +187,8 @@ public class DirectoriesTest
{
assertEquals(cfDir(INDEX_CFM), dir);
}
- Descriptor parentDesc = new Descriptor(parentDirectories.getDirectoryForNewSSTables(), KS, PARENT_CFM.cfName, 0);
- Descriptor indexDesc = new Descriptor(indexDirectories.getDirectoryForNewSSTables(), KS, INDEX_CFM.cfName, 0);
+ Descriptor parentDesc = new Descriptor(parentDirectories.getDirectoryForNewSSTables(), KS, PARENT_CFM.cfName, 0, SSTableFormat.Type.BIG);
+ Descriptor indexDesc = new Descriptor(indexDirectories.getDirectoryForNewSSTables(), KS, INDEX_CFM.cfName, 0, SSTableFormat.Type.BIG);
// snapshot dir should be created under its parent's
File parentSnapshotDirectory = Directories.getSnapshotDirectory(parentDesc, "test");
@@ -204,9 +205,9 @@ public class DirectoriesTest
indexDirectories.snapshotCreationTime("test"));
// check true snapshot size
- Descriptor parentSnapshot = new Descriptor(parentSnapshotDirectory, KS, PARENT_CFM.cfName, 0);
+ Descriptor parentSnapshot = new Descriptor(parentSnapshotDirectory, KS, PARENT_CFM.cfName, 0, SSTableFormat.Type.BIG);
createFile(parentSnapshot.filenameFor(Component.DATA), 30);
- Descriptor indexSnapshot = new Descriptor(indexSnapshotDirectory, KS, INDEX_CFM.cfName, 0);
+ Descriptor indexSnapshot = new Descriptor(indexSnapshotDirectory, KS, INDEX_CFM.cfName, 0, SSTableFormat.Type.BIG);
createFile(indexSnapshot.filenameFor(Component.DATA), 40);
assertEquals(30, parentDirectories.trueSnapshotsSize());
@@ -354,7 +355,7 @@ public class DirectoriesTest
final String n = Long.toString(System.nanoTime());
Callable<File> directoryGetter = new Callable<File>() {
public File call() throws Exception {
- Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1);
+ Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1, SSTableFormat.Type.BIG);
return Directories.getSnapshotDirectory(desc, n);
}
};
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
index 79eb449..3f43b51 100644
--- a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
+++ b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
@@ -52,7 +52,6 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredSerializer;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.sstable.IndexInfo;
import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
import org.apache.cassandra.io.sstable.format.Version;
@@ -803,7 +802,7 @@ public class RowIndexEntryTest extends CQLTester
RowIndexEntry rie = new RowIndexEntry(0L)
{
- public IndexInfoRetriever openWithIndex(SegmentedFile indexFile)
+ public IndexInfoRetriever openWithIndex(FileHandle indexFile)
{
return new IndexInfoRetriever()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
index 5ed22e4..2620a31 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
@@ -46,15 +46,13 @@ import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.compaction.*;
import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.sstable.metadata.MetadataType;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
-import org.apache.cassandra.io.util.BufferedSegmentedFile;
-import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.io.util.FileHandle;
import org.apache.cassandra.utils.AlwaysPresentFilter;
import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest;
import org.apache.cassandra.utils.concurrent.Transactional;
@@ -1156,7 +1154,7 @@ public class LogTransactionTest extends AbstractTransactionalTest
private static SSTableReader sstable(File dataFolder, ColumnFamilyStore cfs, int generation, int size) throws IOException
{
- Descriptor descriptor = new Descriptor(dataFolder, cfs.keyspace.getName(), cfs.getTableName(), generation);
+ Descriptor descriptor = new Descriptor(dataFolder, cfs.keyspace.getName(), cfs.getTableName(), generation, SSTableFormat.Type.BIG);
Set<Component> components = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.TOC);
for (Component component : components)
{
@@ -1169,8 +1167,8 @@ public class LogTransactionTest extends AbstractTransactionalTest
}
}
- SegmentedFile dFile = new BufferedSegmentedFile(new ChannelProxy(new File(descriptor.filenameFor(Component.DATA))), RandomAccessReader.DEFAULT_BUFFER_SIZE, 0);
- SegmentedFile iFile = new BufferedSegmentedFile(new ChannelProxy(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))), RandomAccessReader.DEFAULT_BUFFER_SIZE, 0);
+ FileHandle dFile = new FileHandle.Builder(descriptor.filenameFor(Component.DATA)).complete();
+ FileHandle iFile = new FileHandle.Builder(descriptor.filenameFor(Component.PRIMARY_INDEX)).complete();
SerializationHeader header = SerializationHeader.make(cfs.metadata, Collections.emptyList());
StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata.comparator)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index 309083b..2de6b62 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -70,39 +70,38 @@ public class CompressedRandomAccessReaderTest
{
File f = File.createTempFile("compressed6791_", "3");
String filename = f.getAbsolutePath();
- try(ChannelProxy channel = new ChannelProxy(f))
+ MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance));
+ try(CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata",
+ null, SequentialWriterOption.DEFAULT,
+ CompressionParams.snappy(32),
+ sstableMetadataCollector))
{
- MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance));
- try(CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata",
- null, SequentialWriterOption.DEFAULT,
- CompressionParams.snappy(32),
- sstableMetadataCollector))
- {
-
- for (int i = 0; i < 20; i++)
- writer.write("x".getBytes());
+ for (int i = 0; i < 20; i++)
+ writer.write("x".getBytes());
- DataPosition mark = writer.mark();
- // write enough garbage to create new chunks:
- for (int i = 0; i < 40; ++i)
- writer.write("y".getBytes());
+ DataPosition mark = writer.mark();
+ // write enough garbage to create new chunks:
+ for (int i = 0; i < 40; ++i)
+ writer.write("y".getBytes());
- writer.resetAndTruncate(mark);
+ writer.resetAndTruncate(mark);
- for (int i = 0; i < 20; i++)
- writer.write("x".getBytes());
- writer.finish();
- }
+ for (int i = 0; i < 20; i++)
+ writer.write("x".getBytes());
+ writer.finish();
+ }
- try(RandomAccessReader reader = RandomAccessReader.builder(channel)
- .compression(new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32))
- .build())
- {
- String res = reader.readLine();
- assertEquals(res, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx");
- assertEquals(40, res.length());
- }
+ try (FileHandle.Builder builder = new FileHandle.Builder(filename)
+ .withCompressionMetadata(new CompressionMetadata(filename + ".metadata",
+ f.length(),
+ ChecksumType.CRC32));
+ FileHandle fh = builder.complete();
+ RandomAccessReader reader = fh.createReader())
+ {
+ String res = reader.readLine();
+ assertEquals(res, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx");
+ assertEquals(40, res.length());
}
finally
{
@@ -117,56 +116,39 @@ public class CompressedRandomAccessReaderTest
private static void testResetAndTruncate(File f, boolean compressed, boolean usemmap, int junkSize) throws IOException
{
final String filename = f.getAbsolutePath();
- try(ChannelProxy channel = new ChannelProxy(f))
- {
- MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance));
- try(SequentialWriter writer = compressed
+ MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance));
+ try(SequentialWriter writer = compressed
? new CompressedSequentialWriter(f, filename + ".metadata",
- null, SequentialWriterOption.DEFAULT,
- CompressionParams.snappy(), sstableMetadataCollector)
+ null, SequentialWriterOption.DEFAULT,
+ CompressionParams.snappy(), sstableMetadataCollector)
: new SequentialWriter(f))
- {
- writer.write("The quick ".getBytes());
- DataPosition mark = writer.mark();
- writer.write("blue fox jumps over the lazy dog".getBytes());
-
- // write enough to be sure to change chunk
- for (int i = 0; i < junkSize; ++i)
- {
- writer.write((byte) 1);
- }
-
- writer.resetAndTruncate(mark);
- writer.write("brown fox jumps over the lazy dog".getBytes());
- writer.finish();
- }
- assert f.exists();
-
- CompressionMetadata compressionMetadata = compressed ? new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32) : null;
- RandomAccessReader.Builder builder = RandomAccessReader.builder(channel);
- if (compressed)
- builder.compression(compressionMetadata);
+ {
+ writer.write("The quick ".getBytes());
+ DataPosition mark = writer.mark();
+ writer.write("blue fox jumps over the lazy dog".getBytes());
- MmappedRegions regions = null;
- if (usemmap)
+ // write enough to be sure to change chunk
+ for (int i = 0; i < junkSize; ++i)
{
- regions = compressed
- ? MmappedRegions.map(channel, compressionMetadata)
- : MmappedRegions.map(channel, f.length());
- builder.regions(regions);
+ writer.write((byte) 1);
}
- try(RandomAccessReader reader = builder.build())
- {
- String expected = "The quick brown fox jumps over the lazy dog";
- assertEquals(expected.length(), reader.length());
- byte[] b = new byte[expected.length()];
- reader.readFully(b);
- assert new String(b).equals(expected) : "Expecting '" + expected + "', got '" + new String(b) + '\'';
- }
+ writer.resetAndTruncate(mark);
+ writer.write("brown fox jumps over the lazy dog".getBytes());
+ writer.finish();
+ }
+ assert f.exists();
- if (regions != null)
- regions.close();
+ CompressionMetadata compressionMetadata = compressed ? new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32) : null;
+ try (FileHandle.Builder builder = new FileHandle.Builder(filename).mmapped(usemmap).withCompressionMetadata(compressionMetadata);
+ FileHandle fh = builder.complete();
+ RandomAccessReader reader = fh.createReader())
+ {
+ String expected = "The quick brown fox jumps over the lazy dog";
+ assertEquals(expected.length(), reader.length());
+ byte[] b = new byte[expected.length()];
+ reader.readFully(b);
+ assert new String(b).equals(expected) : "Expecting '" + expected + "', got '" + new String(b) + '\'';
}
finally
{
@@ -201,70 +183,69 @@ public class CompressedRandomAccessReaderTest
writer.finish();
}
- try(ChannelProxy channel = new ChannelProxy(file))
- {
- // open compression metadata and get chunk information
- CompressionMetadata meta = new CompressionMetadata(metadata.getPath(), file.length(), ChecksumType.CRC32);
- CompressionMetadata.Chunk chunk = meta.chunkFor(0);
+ // open compression metadata and get chunk information
+ CompressionMetadata meta = new CompressionMetadata(metadata.getPath(), file.length(), ChecksumType.CRC32);
+ CompressionMetadata.Chunk chunk = meta.chunkFor(0);
- try(RandomAccessReader reader = RandomAccessReader.builder(channel).compression(meta).build())
- {// read and verify compressed data
- assertEquals(CONTENT, reader.readLine());
+ try (FileHandle.Builder builder = new FileHandle.Builder(file.getPath()).withCompressionMetadata(meta);
+ FileHandle fh = builder.complete();
+ RandomAccessReader reader = fh.createReader())
+ {// read and verify compressed data
+ assertEquals(CONTENT, reader.readLine());
- Random random = new Random();
- RandomAccessFile checksumModifier = null;
+ Random random = new Random();
+ RandomAccessFile checksumModifier = null;
- try
+ try
+ {
+ checksumModifier = new RandomAccessFile(file, "rw");
+ byte[] checksum = new byte[4];
+
+ // seek to the end of the compressed chunk
+ checksumModifier.seek(chunk.length);
+ // read checksum bytes
+ checksumModifier.read(checksum);
+ // seek back to the chunk end
+ checksumModifier.seek(chunk.length);
+
+ // lets modify one byte of the checksum on each iteration
+ for (int i = 0; i < checksum.length; i++)
{
- checksumModifier = new RandomAccessFile(file, "rw");
- byte[] checksum = new byte[4];
-
- // seek to the end of the compressed chunk
- checksumModifier.seek(chunk.length);
- // read checksum bytes
- checksumModifier.read(checksum);
- // seek back to the chunk end
- checksumModifier.seek(chunk.length);
+ checksumModifier.write(random.nextInt());
+ SyncUtil.sync(checksumModifier); // making sure that change was synced with disk
- // lets modify one byte of the checksum on each iteration
- for (int i = 0; i < checksum.length; i++)
+ try (final RandomAccessReader r = fh.createReader())
{
- checksumModifier.write(random.nextInt());
- SyncUtil.sync(checksumModifier); // making sure that change was synced with disk
-
- try (final RandomAccessReader r = RandomAccessReader.builder(channel).compression(meta).build())
+ Throwable exception = null;
+ try
+ {
+ r.readLine();
+ }
+ catch (Throwable t)
{
- Throwable exception = null;
- try
- {
- r.readLine();
- }
- catch (Throwable t)
- {
- exception = t;
- }
- assertNotNull(exception);
- assertSame(exception.getClass(), CorruptSSTableException.class);
- assertSame(exception.getCause().getClass(), CorruptBlockException.class);
+ exception = t;
}
+ assertNotNull(exception);
+ assertSame(exception.getClass(), CorruptSSTableException.class);
+ assertSame(exception.getCause().getClass(), CorruptBlockException.class);
}
+ }
- // lets write original checksum and check if we can read data
- updateChecksum(checksumModifier, chunk.length, checksum);
+ // lets write original checksum and check if we can read data
+ updateChecksum(checksumModifier, chunk.length, checksum);
- try (RandomAccessReader cr = RandomAccessReader.builder(channel).compression(meta).build())
- {
- // read and verify compressed data
- assertEquals(CONTENT, cr.readLine());
- // close reader
- }
- }
- finally
+ try (RandomAccessReader cr = fh.createReader())
{
- if (checksumModifier != null)
- checksumModifier.close();
+ // read and verify compressed data
+ assertEquals(CONTENT, cr.readLine());
+ // close reader
}
}
+ finally
+ {
+ if (checksumModifier != null)
+ checksumModifier.close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
index 9959c7b..fa9643b 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@ -34,7 +34,6 @@ import org.junit.Test;
import junit.framework.Assert;
import org.apache.cassandra.db.ClusteringComparator;
-import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -82,43 +81,42 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest
private void testWrite(File f, int bytesToTest) throws IOException
{
final String filename = f.getAbsolutePath();
- final ChannelProxy channel = new ChannelProxy(f);
-
- try
+ MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(Collections.singletonList(BytesType.instance)));
+
+ byte[] dataPre = new byte[bytesToTest];
+ byte[] rawPost = new byte[bytesToTest];
+ try (CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata",
+ null, SequentialWriterOption.DEFAULT,
+ compressionParameters,
+ sstableMetadataCollector))
{
- MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(Arrays.<AbstractType<?>>asList(BytesType.instance)));
-
- byte[] dataPre = new byte[bytesToTest];
- byte[] rawPost = new byte[bytesToTest];
- try (CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata",
- null, SequentialWriterOption.DEFAULT,
- compressionParameters,
- sstableMetadataCollector))
+ Random r = new Random(42);
+
+ // Test both write with byte[] and ByteBuffer
+ r.nextBytes(dataPre);
+ r.nextBytes(rawPost);
+ ByteBuffer dataPost = makeBB(bytesToTest);
+ dataPost.put(rawPost);
+ dataPost.flip();
+
+ writer.write(dataPre);
+ DataPosition mark = writer.mark();
+
+ // Write enough garbage to transition chunk
+ for (int i = 0; i < CompressionParams.DEFAULT_CHUNK_LENGTH; i++)
{
- Random r = new Random(42);
-
- // Test both write with byte[] and ByteBuffer
- r.nextBytes(dataPre);
- r.nextBytes(rawPost);
- ByteBuffer dataPost = makeBB(bytesToTest);
- dataPost.put(rawPost);
- dataPost.flip();
-
- writer.write(dataPre);
- DataPosition mark = writer.mark();
-
- // Write enough garbage to transition chunk
- for (int i = 0; i < CompressionParams.DEFAULT_CHUNK_LENGTH; i++)
- {
- writer.write((byte)i);
- }
- writer.resetAndTruncate(mark);
- writer.write(dataPost);
- writer.finish();
+ writer.write((byte)i);
}
+ writer.resetAndTruncate(mark);
+ writer.write(dataPost);
+ writer.finish();
+ }
- assert f.exists();
- RandomAccessReader reader = RandomAccessReader.builder(channel).compression(new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32)).build();
+ assert f.exists();
+ try (FileHandle.Builder builder = new FileHandle.Builder(filename).withCompressionMetadata(new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32));
+ FileHandle fh = builder.complete();
+ RandomAccessReader reader = fh.createReader())
+ {
assertEquals(dataPre.length + rawPost.length, reader.length());
byte[] result = new byte[(int)reader.length()];
@@ -134,9 +132,6 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest
}
finally
{
- // cleanup
- channel.close();
-
if (f.exists())
f.delete();
File metadata = new File(f + ".metadata");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java b/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
index f769293..6226ee6 100644
--- a/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
@@ -77,14 +77,14 @@ public class DescriptorTest
private void testFromFilenameFor(File dir)
{
// normal
- checkFromFilename(new Descriptor(dir, ksname, cfname, 1), false);
+ checkFromFilename(new Descriptor(dir, ksname, cfname, 1, SSTableFormat.Type.BIG), false);
// skip component (for streaming lock file)
- checkFromFilename(new Descriptor(dir, ksname, cfname, 2), true);
+ checkFromFilename(new Descriptor(dir, ksname, cfname, 2, SSTableFormat.Type.BIG), true);
// secondary index
String idxName = "myidx";
File idxDir = new File(dir.getAbsolutePath() + File.separator + Directories.SECONDARY_INDEX_NAME_SEPARATOR + idxName);
- checkFromFilename(new Descriptor(idxDir, ksname, cfname + Directories.SECONDARY_INDEX_NAME_SEPARATOR + idxName, 4), false);
+ checkFromFilename(new Descriptor(idxDir, ksname, cfname + Directories.SECONDARY_INDEX_NAME_SEPARATOR + idxName, 4, SSTableFormat.Type.BIG), false);
// legacy version
checkFromFilename(new Descriptor("ja", dir, ksname, cfname, 1, SSTableFormat.Type.LEGACY), false);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index 151a995..0928ad4 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -31,7 +31,6 @@ import org.junit.runner.RunWith;
import org.apache.cassandra.OrderedJUnit4ClassRunner;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.CompactionManager;
@@ -47,7 +46,6 @@ import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.MmappedRegions;
-import org.apache.cassandra.io.util.SegmentedFile;
import org.apache.cassandra.schema.CachingParams;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.CacheService;
@@ -393,13 +391,7 @@ public class SSTableReaderTest
SSTableReader sstable = indexCfs.getLiveSSTables().iterator().next();
assert sstable.first.getToken() instanceof LocalToken;
- try (SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(),
- false);
- SegmentedFile.Builder dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(),
- sstable.compression))
- {
- sstable.saveSummary(ibuilder, dbuilder);
- }
+ sstable.saveSummary();
SSTableReader reopened = SSTableReader.open(sstable.descriptor);
assert reopened.first.getToken() instanceof LocalToken;
reopened.selfRef().release();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index 5c7ff02..df9d1aa 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.service.ActiveRepairService;
@@ -76,7 +77,7 @@ public class SSTableUtils
File cfDir = new File(tempdir, keyspaceName + File.separator + cfname);
cfDir.mkdirs();
cfDir.deleteOnExit();
- File datafile = new File(new Descriptor(cfDir, keyspaceName, cfname, generation).filenameFor(Component.DATA));
+ File datafile = new File(new Descriptor(cfDir, keyspaceName, cfname, generation, SSTableFormat.Type.BIG).filenameFor(Component.DATA));
if (!datafile.createNewFile())
throw new IOException("unable to create file " + datafile);
datafile.deleteOnExit();