You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/07/06 14:38:50 UTC
[3/3] cassandra git commit: Remove DatabaseDescriptor dependency from
SegmentedFile
Remove DatabaseDescriptor dependency from SegmentedFile
patch by yukim; reviewed by Stefania Alborghetti for CASSANDRA-11580
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b4133f38
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b4133f38
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b4133f38
Branch: refs/heads/trunk
Commit: b4133f38d5ef5fc50047eb4a31307ac97c5b72ee
Parents: a4e7387
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Jul 6 06:08:17 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 6 09:36:40 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/cache/AutoSavingCache.java | 4 +-
.../org/apache/cassandra/cache/ChunkCache.java | 2 +-
.../cassandra/config/DatabaseDescriptor.java | 35 +-
.../org/apache/cassandra/db/RowIndexEntry.java | 10 +-
.../columniterator/AbstractSSTableIterator.java | 8 +-
.../db/columniterator/SSTableIterator.java | 4 +-
.../columniterator/SSTableReversedIterator.java | 4 +-
.../cassandra/db/commitlog/CommitLogReader.java | 3 +-
.../apache/cassandra/io/sstable/Descriptor.java | 15 +-
.../apache/cassandra/io/sstable/SSTable.java | 9 +-
.../io/sstable/format/SSTableReader.java | 104 ++---
.../io/sstable/format/SSTableWriter.java | 3 +-
.../io/sstable/format/big/BigTableWriter.java | 57 ++-
.../io/util/AbstractReaderFileProxy.java | 2 +-
.../io/util/BufferManagingRebufferer.java | 12 +-
.../io/util/BufferedSegmentedFile.java | 58 ---
.../io/util/ChecksummedRandomAccessReader.java | 101 +----
.../io/util/ChecksummedRebufferer.java | 76 ++++
.../apache/cassandra/io/util/ChunkReader.java | 5 -
.../io/util/CompressedChunkReader.java | 227 ++++++++++
.../io/util/CompressedSegmentedFile.java | 358 ---------------
.../cassandra/io/util/CorruptFileException.java | 31 ++
.../io/util/DiskOptimizationStrategy.java | 48 ++
.../apache/cassandra/io/util/FileHandle.java | 440 +++++++++++++++++++
.../cassandra/io/util/ICompressedFile.java | 26 --
.../cassandra/io/util/MmapRebufferer.java | 2 +-
.../cassandra/io/util/MmappedRegions.java | 5 +
.../cassandra/io/util/MmappedSegmentedFile.java | 117 -----
.../cassandra/io/util/RandomAccessReader.java | 192 ++------
.../apache/cassandra/io/util/Rebufferer.java | 4 +-
.../apache/cassandra/io/util/SegmentedFile.java | 318 --------------
.../cassandra/io/util/SimpleChunkReader.java | 10 +-
.../util/SpinningDiskOptimizationStrategy.java | 31 ++
.../io/util/SsdDiskOptimizationStrategy.java | 49 +++
test/unit/org/apache/cassandra/MockSchema.java | 12 +-
.../cassandra/db/ColumnFamilyStoreTest.java | 6 +-
.../apache/cassandra/db/DirectoriesTest.java | 15 +-
.../apache/cassandra/db/RowIndexEntryTest.java | 3 +-
.../db/lifecycle/LogTransactionTest.java | 12 +-
.../CompressedRandomAccessReaderTest.java | 223 +++++-----
.../CompressedSequentialWriterTest.java | 69 ++-
.../cassandra/io/sstable/DescriptorTest.java | 6 +-
.../cassandra/io/sstable/SSTableReaderTest.java | 10 +-
.../cassandra/io/sstable/SSTableUtils.java | 3 +-
.../metadata/MetadataSerializerTest.java | 8 +-
.../io/util/BufferedRandomAccessFileTest.java | 404 +++++++++--------
.../util/ChecksummedRandomAccessReaderTest.java | 50 +--
.../io/util/DiskOptimizationStrategyTest.java | 85 ++++
.../io/util/RandomAccessReaderTest.java | 77 ++--
.../cassandra/io/util/SegmentedFileTest.java | 88 ----
51 files changed, 1615 insertions(+), 1827 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2e2444a..f5d1d01 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.10
+ * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
* Add supplied username to authentication error messages (CASSANDRA-12076)
* Remove pre-startup check for open JMX port (CASSANDRA-12074)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index cb2ad8a..b98ad53 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -44,7 +44,7 @@ import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.*;
-import org.apache.cassandra.io.util.ChecksummedRandomAccessReader.CorruptFileException;
+import org.apache.cassandra.io.util.CorruptFileException;
import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -91,7 +91,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
public InputStream getInputStream(File dataPath, File crcPath) throws IOException
{
- return new ChecksummedRandomAccessReader.Builder(dataPath, crcPath).build();
+ return ChecksummedRandomAccessReader.open(dataPath, crcPath);
}
public OutputStream getOutputStream(File dataPath, File crcPath)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/cache/ChunkCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/ChunkCache.java b/src/java/org/apache/cassandra/cache/ChunkCache.java
index e6296bd..0845944 100644
--- a/src/java/org/apache/cassandra/cache/ChunkCache.java
+++ b/src/java/org/apache/cassandra/cache/ChunkCache.java
@@ -184,7 +184,7 @@ public class ChunkCache
return instance.wrap(file);
}
- public void invalidatePosition(SegmentedFile dfile, long position)
+ public void invalidatePosition(FileHandle dfile, long position)
{
if (!(dfile.rebuffererFactory() instanceof CachingRebufferer))
return;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 1375a39..be62f10 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -46,7 +46,10 @@ import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.util.DiskOptimizationStrategy;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.SpinningDiskOptimizationStrategy;
+import org.apache.cassandra.io.util.SsdDiskOptimizationStrategy;
import org.apache.cassandra.locator.*;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.scheduler.IRequestScheduler;
@@ -108,6 +111,8 @@ public class DatabaseDescriptor
private static EncryptionContext encryptionContext;
private static boolean hasLoggedConfig;
+ private static DiskOptimizationStrategy diskOptimizationStrategy;
+
public static void forceStaticInitialization() {}
static
{
@@ -124,6 +129,15 @@ public class DatabaseDescriptor
{
applyConfig(loadConfig());
}
+ switch (conf.disk_optimization_strategy)
+ {
+ case ssd:
+ diskOptimizationStrategy = new SsdDiskOptimizationStrategy(conf.disk_optimization_page_cross_chance);
+ break;
+ case spinning:
+ diskOptimizationStrategy = new SpinningDiskOptimizationStrategy();
+ break;
+ }
}
catch (Exception e)
{
@@ -1862,15 +1876,9 @@ public class DatabaseDescriptor
return conf.buffer_pool_use_heap_if_exhausted;
}
- public static Config.DiskOptimizationStrategy getDiskOptimizationStrategy()
- {
- return conf.disk_optimization_strategy;
- }
-
- @VisibleForTesting
- public static void setDiskOptimizationStrategy(Config.DiskOptimizationStrategy strategy)
+ public static DiskOptimizationStrategy getDiskOptimizationStrategy()
{
- conf.disk_optimization_strategy = strategy;
+ return diskOptimizationStrategy;
}
public static double getDiskOptimizationEstimatePercentile()
@@ -1878,17 +1886,6 @@ public class DatabaseDescriptor
return conf.disk_optimization_estimate_percentile;
}
- public static double getDiskOptimizationPageCrossChance()
- {
- return conf.disk_optimization_page_cross_chance;
- }
-
- @VisibleForTesting
- public static void setDiskOptimizationPageCrossChance(double chance)
- {
- conf.disk_optimization_page_cross_chance = chance;
- }
-
public static long getTotalCommitlogSpaceInMB()
{
return conf.commitlog_total_space_in_mb;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/db/RowIndexEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java
index dd1fdb7..d030329 100644
--- a/src/java/org/apache/cassandra/db/RowIndexEntry.java
+++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.io.util.FileHandle;
import org.apache.cassandra.io.util.TrackedDataInputPlus;
import org.apache.cassandra.metrics.DefaultNameFactory;
import org.apache.cassandra.metrics.MetricNameFactory;
@@ -229,7 +229,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory
return new RowIndexEntry<>(dataFilePosition);
}
- public IndexInfoRetriever openWithIndex(SegmentedFile indexFile)
+ public IndexInfoRetriever openWithIndex(FileHandle indexFile)
{
return null;
}
@@ -480,7 +480,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory
}
@Override
- public IndexInfoRetriever openWithIndex(SegmentedFile indexFile)
+ public IndexInfoRetriever openWithIndex(FileHandle indexFile)
{
int fieldsSize = (int) DeletionTime.serializer.serializedSize(deletionTime)
+ TypeSizes.sizeof(0); // columnIndexCount
@@ -715,7 +715,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory
}
@Override
- public IndexInfoRetriever openWithIndex(SegmentedFile indexFile)
+ public IndexInfoRetriever openWithIndex(FileHandle indexFile)
{
indexEntrySizeHistogram.update(serializedSize(deletionTime, headerLength, columnsIndex.length) + indexedPartSize);
indexInfoCountHistogram.update(columnsIndex.length);
@@ -879,7 +879,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory
}
@Override
- public IndexInfoRetriever openWithIndex(SegmentedFile indexFile)
+ public IndexInfoRetriever openWithIndex(FileHandle indexFile)
{
indexEntrySizeHistogram.update(indexedPartSize + fieldsSerializedSize);
indexInfoCountHistogram.update(columnsIndexCount);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
index 005bb2c..16aa30a 100644
--- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
@@ -31,7 +31,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.DataPosition;
-import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.io.util.FileHandle;
import org.apache.cassandra.utils.ByteBufferUtil;
public abstract class AbstractSSTableIterator implements UnfilteredRowIterator
@@ -47,7 +47,7 @@ public abstract class AbstractSSTableIterator implements UnfilteredRowIterator
private final boolean isForThrift;
- protected final SegmentedFile ifile;
+ protected final FileHandle ifile;
private boolean isClosed;
@@ -63,7 +63,7 @@ public abstract class AbstractSSTableIterator implements UnfilteredRowIterator
Slices slices,
ColumnFilter columnFilter,
boolean isForThrift,
- SegmentedFile ifile)
+ FileHandle ifile)
{
this.sstable = sstable;
this.ifile = ifile;
@@ -453,7 +453,7 @@ public abstract class AbstractSSTableIterator implements UnfilteredRowIterator
// Marks the beginning of the block corresponding to currentIndexIdx.
private DataPosition mark;
- public IndexState(Reader reader, ClusteringComparator comparator, RowIndexEntry indexEntry, boolean reversed, SegmentedFile indexFile)
+ public IndexState(Reader reader, ClusteringComparator comparator, RowIndexEntry indexEntry, boolean reversed, FileHandle indexFile)
{
this.reader = reader;
this.comparator = comparator;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
index e4f6700..ad3732f 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
@@ -25,7 +25,7 @@ import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.io.util.FileHandle;
/**
* A Cell Iterator over SSTable
@@ -39,7 +39,7 @@ public class SSTableIterator extends AbstractSSTableIterator
Slices slices,
ColumnFilter columns,
boolean isForThrift,
- SegmentedFile ifile)
+ FileHandle ifile)
{
super(sstable, file, key, indexEntry, slices, columns, isForThrift, ifile);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
index 6fef70f..e65376e 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
@@ -27,7 +27,7 @@ import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.io.util.FileHandle;
import org.apache.cassandra.utils.btree.BTree;
/**
@@ -42,7 +42,7 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
Slices slices,
ColumnFilter columns,
boolean isForThrift,
- SegmentedFile ifile)
+ FileHandle ifile)
{
super(sstable, file, key, indexEntry, slices, columns, isForThrift, ifile);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
index a9e9038..4594080 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
@@ -121,8 +121,7 @@ public class CommitLogReader
// just transform from the file name (no reading of headers) to determine version
CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
- try(ChannelProxy channel = new ChannelProxy(file);
- RandomAccessReader reader = RandomAccessReader.open(channel))
+ try(RandomAccessReader reader = RandomAccessReader.open(file))
{
if (desc.version < CommitLogDescriptor.VERSION_21)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index 7840985..13611a6 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -27,11 +27,9 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CharMatcher;
import com.google.common.base.Objects;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.Version;
-import org.apache.cassandra.io.sstable.format.big.BigFormat;
import org.apache.cassandra.io.sstable.metadata.IMetadataSerializer;
import org.apache.cassandra.io.sstable.metadata.LegacyMetadataSerializer;
import org.apache.cassandra.io.sstable.metadata.MetadataSerializer;
@@ -63,26 +61,17 @@ public class Descriptor
private final int hashCode;
/**
- * A descriptor that assumes CURRENT_VERSION.
- */
- @VisibleForTesting
- public Descriptor(File directory, String ksname, String cfname, int generation)
- {
- this(DatabaseDescriptor.getSSTableFormat().info.getLatestVersion(), directory, ksname, cfname, generation, DatabaseDescriptor.getSSTableFormat(), null);
- }
-
- /**
* Constructor for sstable writers only.
*/
public Descriptor(File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType)
{
- this(formatType.info.getLatestVersion(), directory, ksname, cfname, generation, formatType, Component.digestFor(BigFormat.latestVersion.uncompressedChecksumType()));
+ this(formatType.info.getLatestVersion(), directory, ksname, cfname, generation, formatType, Component.digestFor(formatType.info.getLatestVersion().uncompressedChecksumType()));
}
@VisibleForTesting
public Descriptor(String version, File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType)
{
- this(formatType.info.getVersion(version), directory, ksname, cfname, generation, formatType, Component.digestFor(BigFormat.latestVersion.uncompressedChecksumType()));
+ this(formatType.info.getVersion(version), directory, ksname, cfname, generation, formatType, Component.digestFor(formatType.info.getLatestVersion().uncompressedChecksumType()));
}
public Descriptor(Version version, File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType, Component digestComponent)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index 923ef82..601f5a0 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.util.DiskOptimizationStrategy;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -68,12 +69,9 @@ public abstract class SSTable
public DecoratedKey first;
public DecoratedKey last;
- protected SSTable(Descriptor descriptor, CFMetaData metadata)
- {
- this(descriptor, new HashSet<>(), metadata);
- }
+ protected final DiskOptimizationStrategy optimizationStrategy;
- protected SSTable(Descriptor descriptor, Set<Component> components, CFMetaData metadata)
+ protected SSTable(Descriptor descriptor, Set<Component> components, CFMetaData metadata, DiskOptimizationStrategy optimizationStrategy)
{
// In almost all cases, metadata shouldn't be null, but allowing null allows to create a mostly functional SSTable without
// full schema definition. SSTableLoader use that ability
@@ -86,6 +84,7 @@ public abstract class SSTable
this.compression = dataComponents.contains(Component.COMPRESSION_INFO);
this.components = new CopyOnWriteArraySet<>(dataComponents);
this.metadata = metadata;
+ this.optimizationStrategy = Objects.requireNonNull(optimizationStrategy);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 91b71cc..78a6825 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -37,11 +37,14 @@ import org.slf4j.LoggerFactory;
import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
import com.clearspring.analytics.stream.cardinality.ICardinality;
+
+import org.apache.cassandra.cache.ChunkCache;
import org.apache.cassandra.cache.InstrumentingCache;
import org.apache.cassandra.cache.KeyCacheKey;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
@@ -190,8 +193,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
public final UniqueIdentifier instanceId = new UniqueIdentifier();
// indexfile and datafile: might be null before a call to load()
- protected SegmentedFile ifile;
- protected SegmentedFile dfile;
+ protected FileHandle ifile;
+ protected FileHandle dfile;
protected IndexSummary indexSummary;
protected IFilter bf;
@@ -430,16 +433,20 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
OpenReason.NORMAL,
header == null? null : header.toHeader(metadata));
- // special implementation of load to use non-pooled SegmentedFile builders
- try(SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
- SegmentedFile.Builder dbuilder = sstable.compression
- ? new CompressedSegmentedFile.Builder(null)
- : new BufferedSegmentedFile.Builder())
+ try(FileHandle.Builder ibuilder = new FileHandle.Builder(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX))
+ .mmapped(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap)
+ .withChunkCache(ChunkCache.instance);
+ FileHandle.Builder dbuilder = new FileHandle.Builder(sstable.descriptor.filenameFor(Component.DATA)).compressed(sstable.compression)
+ .mmapped(DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap)
+ .withChunkCache(ChunkCache.instance))
{
- if (!sstable.loadSummary(ibuilder, dbuilder))
+ if (!sstable.loadSummary())
sstable.buildSummary(false, false, Downsampling.BASE_SAMPLING_LEVEL);
- sstable.ifile = ibuilder.buildIndex(sstable.descriptor, sstable.indexSummary);
- sstable.dfile = dbuilder.buildData(sstable.descriptor, statsMetadata);
+ long indexFileLength = new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length();
+ int dataBufferSize = sstable.optimizationStrategy.bufferSize(statsMetadata.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
+ int indexBufferSize = sstable.optimizationStrategy.bufferSize(indexFileLength / sstable.indexSummary.size());
+ sstable.ifile = ibuilder.bufferSize(sstable.optimizationStrategy.bufferSize(indexBufferSize)).complete();
+ sstable.dfile = dbuilder.bufferSize(sstable.optimizationStrategy.bufferSize(dataBufferSize)).complete();
sstable.bf = FilterFactory.AlwaysPresent;
sstable.setup(false);
return sstable;
@@ -578,8 +585,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
public static SSTableReader internalOpen(Descriptor desc,
Set<Component> components,
CFMetaData metadata,
- SegmentedFile ifile,
- SegmentedFile dfile,
+ FileHandle ifile,
+ FileHandle dfile,
IndexSummary isummary,
IFilter bf,
long maxDataAge,
@@ -622,7 +629,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
OpenReason openReason,
SerializationHeader header)
{
- super(desc, components, metadata);
+ super(desc, components, metadata, DatabaseDescriptor.getDiskOptimizationStrategy());
this.sstableMetadata = sstableMetadata;
this.header = header;
this.maxDataAge = maxDataAge;
@@ -730,10 +737,14 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
*/
private void load(boolean recreateBloomFilter, boolean saveSummaryIfCreated) throws IOException
{
- try(SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false);
- SegmentedFile.Builder dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), compression))
+ try(FileHandle.Builder ibuilder = new FileHandle.Builder(descriptor.filenameFor(Component.PRIMARY_INDEX))
+ .mmapped(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap)
+ .withChunkCache(ChunkCache.instance);
+ FileHandle.Builder dbuilder = new FileHandle.Builder(descriptor.filenameFor(Component.DATA)).compressed(compression)
+ .mmapped(DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap)
+ .withChunkCache(ChunkCache.instance))
{
- boolean summaryLoaded = loadSummary(ibuilder, dbuilder);
+ boolean summaryLoaded = loadSummary();
boolean builtSummary = false;
if (recreateBloomFilter || !summaryLoaded)
{
@@ -741,13 +752,19 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
builtSummary = true;
}
+ long indexFileLength = new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length();
+ int dataBufferSize = optimizationStrategy.bufferSize(sstableMetadata.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
+
if (components.contains(Component.PRIMARY_INDEX))
- ifile = ibuilder.buildIndex(descriptor, indexSummary);
+ {
+ int indexBufferSize = optimizationStrategy.bufferSize(indexFileLength / indexSummary.size());
+ ifile = ibuilder.bufferSize(indexBufferSize).complete();
+ }
- dfile = dbuilder.buildData(descriptor, sstableMetadata);
+ dfile = dbuilder.bufferSize(dataBufferSize).complete();
if (saveSummaryIfCreated && builtSummary)
- saveSummary(ibuilder, dbuilder);
+ saveSummary();
}
catch (Throwable t)
{ // Because the tidier has not been set-up yet in SSTableReader.open(), we must release the files in case of error
@@ -835,12 +852,10 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
* if loaded index summary has different index interval from current value stored in schema,
* then Summary.db file will be deleted and this returns false to rebuild summary.
*
- * @param ibuilder
- * @param dbuilder
* @return true if index summary is loaded successfully from Summary.db file.
*/
@SuppressWarnings("resource")
- public boolean loadSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
+ public boolean loadSummary()
{
File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
if (!summariesFile.exists())
@@ -855,8 +870,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
metadata.params.minIndexInterval, metadata.params.maxIndexInterval);
first = decorateKey(ByteBufferUtil.readWithLength(iStream));
last = decorateKey(ByteBufferUtil.readWithLength(iStream));
- ibuilder.deserializeBounds(iStream, descriptor.version);
- dbuilder.deserializeBounds(iStream, descriptor.version);
}
catch (IOException e)
{
@@ -879,25 +892,22 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
/**
* Save index summary to Summary.db file.
- *
- * @param ibuilder
- * @param dbuilder
*/
- public void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
+ public void saveSummary()
{
- saveSummary(this.descriptor, this.first, this.last, ibuilder, dbuilder, indexSummary);
+ saveSummary(this.descriptor, this.first, this.last, indexSummary);
}
- private void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, IndexSummary newSummary)
+ private void saveSummary(IndexSummary newSummary)
{
- saveSummary(this.descriptor, this.first, this.last, ibuilder, dbuilder, newSummary);
+ saveSummary(this.descriptor, this.first, this.last, newSummary);
}
+
/**
* Save index summary to Summary.db file.
*/
- public static void saveSummary(Descriptor descriptor, DecoratedKey first, DecoratedKey last,
- SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, IndexSummary summary)
+ public static void saveSummary(Descriptor descriptor, DecoratedKey first, DecoratedKey last, IndexSummary summary)
{
File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
if (summariesFile.exists())
@@ -908,8 +918,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel());
ByteBufferUtil.writeWithLength(first.getKey(), oStream);
ByteBufferUtil.writeWithLength(last.getKey(), oStream);
- ibuilder.serializeBounds(oStream, descriptor.version);
- dbuilder.serializeBounds(oStream, descriptor.version);
}
catch (IOException e)
{
@@ -1044,13 +1052,13 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
private static class DropPageCache implements Runnable
{
- final SegmentedFile dfile;
+ final FileHandle dfile;
final long dfilePosition;
- final SegmentedFile ifile;
+ final FileHandle ifile;
final long ifilePosition;
final Runnable andThen;
- private DropPageCache(SegmentedFile dfile, long dfilePosition, SegmentedFile ifile, long ifilePosition, Runnable andThen)
+ private DropPageCache(FileHandle dfile, long dfilePosition, FileHandle ifile, long ifilePosition, Runnable andThen)
{
this.dfile = dfile;
this.dfilePosition = dfilePosition;
@@ -1114,12 +1122,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
"no adjustments to min/max_index_interval");
}
- //Always save the resampled index
- try(SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false);
- SegmentedFile.Builder dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), compression))
- {
- saveSummary(ibuilder, dbuilder, newSummary);
- }
+ // Always save the resampled index
+ saveSummary(newSummary);
long newSize = bytesOnDisk();
StorageMetrics.load.inc(newSize - oldSize);
@@ -1241,7 +1245,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
if (!compression)
throw new IllegalStateException(this + " is not compressed");
- return ((ICompressedFile) dfile).getMetadata();
+ return dfile.compressionMetadata().get();
}
/**
@@ -1596,9 +1600,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
public void setCrcCheckChance(double crcCheckChance)
{
this.crcCheckChance = crcCheckChance;
- if (compression)
- ((CompressedSegmentedFile)dfile).metadata.parameters.setCrcCheckChance(crcCheckChance);
-
+ dfile.compressionMetadata().ifPresent(metadata -> metadata.parameters.setCrcCheckChance(crcCheckChance));
}
/**
@@ -1946,7 +1948,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
return ifile.channel;
}
- public SegmentedFile getIndexFile()
+ public FileHandle getIndexFile()
{
return ifile;
}
@@ -2083,8 +2085,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
private IFilter bf;
private IndexSummary summary;
- private SegmentedFile dfile;
- private SegmentedFile ifile;
+ private FileHandle dfile;
+ private FileHandle ifile;
private Runnable runOnClose;
private boolean isReplaced = false;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index 9f2e159..9fb5f7c 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.SerializationHeader;
@@ -79,7 +80,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
SerializationHeader header,
Collection<SSTableFlushObserver> observers)
{
- super(descriptor, components(metadata), metadata);
+ super(descriptor, components(metadata), metadata, DatabaseDescriptor.getDiskOptimizationStrategy());
this.keyCount = keyCount;
this.repairedAt = repairedAt;
this.metadataCollector = metadataCollector;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index c1d9bbc..26b1543 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -23,12 +23,14 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
+import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.cache.ChunkCache;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -54,11 +56,12 @@ public class BigTableWriter extends SSTableWriter
private final ColumnIndex columnIndexWriter;
private final IndexWriter iwriter;
- private final SegmentedFile.Builder dbuilder;
+ private final FileHandle.Builder dbuilder;
protected final SequentialWriter dataFile;
private DecoratedKey lastWrittenKey;
private DataPosition dataMark;
private long lastEarlyOpenLength = 0;
+ private final Optional<ChunkCache> chunkCache = Optional.ofNullable(ChunkCache.instance);
private final SequentialWriterOption writerOption = SequentialWriterOption.newBuilder()
.trickleFsync(DatabaseDescriptor.getTrickleFsync())
@@ -85,16 +88,17 @@ public class BigTableWriter extends SSTableWriter
writerOption,
metadata.params.compression,
metadataCollector);
- dbuilder = SegmentedFile.getCompressedBuilder((CompressedSequentialWriter) dataFile);
}
else
{
dataFile = new ChecksummedSequentialWriter(new File(getFilename()),
- new File(descriptor.filenameFor(Component.CRC)),
- new File(descriptor.filenameFor(descriptor.digestComponent)),
- writerOption);
- dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), false);
+ new File(descriptor.filenameFor(Component.CRC)),
+ new File(descriptor.filenameFor(descriptor.digestComponent)),
+ writerOption);
}
+ dbuilder = new FileHandle.Builder(descriptor.filenameFor(Component.DATA)).compressed(compression)
+ .mmapped(DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap);
+ chunkCache.ifPresent(dbuilder::withChunkCache);
iwriter = new IndexWriter(keyCount);
columnIndexWriter = new ColumnIndex(this.header, dataFile, descriptor.version, this.observers, getRowIndexEntrySerializer().indexInfoSerializer());
@@ -277,8 +281,13 @@ public class BigTableWriter extends SSTableWriter
assert boundary.indexLength > 0 && boundary.dataLength > 0;
// open the reader early
IndexSummary indexSummary = iwriter.summary.build(metadata.partitioner, boundary);
- SegmentedFile ifile = iwriter.builder.buildIndex(descriptor, indexSummary, boundary);
- SegmentedFile dfile = dbuilder.buildData(descriptor, stats, boundary);
+ long indexFileLength = new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length();
+ int indexBufferSize = optimizationStrategy.bufferSize(indexFileLength / indexSummary.size());
+ FileHandle ifile = iwriter.builder.bufferSize(optimizationStrategy.bufferSize(indexBufferSize)).complete(boundary.indexLength);
+ if (compression)
+ dbuilder.withCompressionMetadata(((CompressedSequentialWriter) dataFile).open(boundary.dataLength));
+ int dataBufferSize = optimizationStrategy.bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
+ FileHandle dfile = dbuilder.bufferSize(optimizationStrategy.bufferSize(dataBufferSize)).complete(boundary.dataLength);
invalidateCacheAtBoundary(dfile);
SSTableReader sstable = SSTableReader.internalOpen(descriptor,
components, metadata,
@@ -291,10 +300,12 @@ public class BigTableWriter extends SSTableWriter
return sstable;
}
- void invalidateCacheAtBoundary(SegmentedFile dfile)
+ void invalidateCacheAtBoundary(FileHandle dfile)
{
- if (ChunkCache.instance != null && lastEarlyOpenLength != 0 && dfile.dataLength() > lastEarlyOpenLength)
- ChunkCache.instance.invalidatePosition(dfile, lastEarlyOpenLength);
+ chunkCache.ifPresent(cache -> {
+ if (lastEarlyOpenLength != 0 && dfile.dataLength() > lastEarlyOpenLength)
+ cache.invalidatePosition(dfile, lastEarlyOpenLength);
+ });
lastEarlyOpenLength = dfile.dataLength();
}
@@ -304,11 +315,11 @@ public class BigTableWriter extends SSTableWriter
dataFile.sync();
iwriter.indexFile.sync();
- return openFinal(descriptor, SSTableReader.OpenReason.EARLY);
+ return openFinal(SSTableReader.OpenReason.EARLY);
}
@SuppressWarnings("resource")
- private SSTableReader openFinal(Descriptor desc, SSTableReader.OpenReason openReason)
+ private SSTableReader openFinal(SSTableReader.OpenReason openReason)
{
if (maxDataAge < 0)
maxDataAge = System.currentTimeMillis();
@@ -316,10 +327,15 @@ public class BigTableWriter extends SSTableWriter
StatsMetadata stats = statsMetadata();
// finalize in-memory state for the reader
IndexSummary indexSummary = iwriter.summary.build(this.metadata.partitioner);
- SegmentedFile ifile = iwriter.builder.buildIndex(desc, indexSummary);
- SegmentedFile dfile = dbuilder.buildData(desc, stats);
+ long indexFileLength = new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length();
+ int dataBufferSize = optimizationStrategy.bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
+ int indexBufferSize = optimizationStrategy.bufferSize(indexFileLength / indexSummary.size());
+ FileHandle ifile = iwriter.builder.bufferSize(optimizationStrategy.bufferSize(indexBufferSize)).complete();
+ if (compression)
+ dbuilder.withCompressionMetadata(((CompressedSequentialWriter) dataFile).open(0));
+ FileHandle dfile = dbuilder.bufferSize(optimizationStrategy.bufferSize(dataBufferSize)).complete();
invalidateCacheAtBoundary(dfile);
- SSTableReader sstable = SSTableReader.internalOpen(desc,
+ SSTableReader sstable = SSTableReader.internalOpen(descriptor,
components,
this.metadata,
ifile,
@@ -355,7 +371,7 @@ public class BigTableWriter extends SSTableWriter
SSTable.appendTOC(descriptor, components);
if (openResult)
- finalReader = openFinal(descriptor, SSTableReader.OpenReason.NORMAL);
+ finalReader = openFinal(SSTableReader.OpenReason.NORMAL);
}
protected Throwable doCommit(Throwable accumulate)
@@ -415,7 +431,7 @@ public class BigTableWriter extends SSTableWriter
class IndexWriter extends AbstractTransactional implements Transactional
{
private final SequentialWriter indexFile;
- public final SegmentedFile.Builder builder;
+ public final FileHandle.Builder builder;
public final IndexSummaryBuilder summary;
public final IFilter bf;
private DataPosition mark;
@@ -423,7 +439,8 @@ public class BigTableWriter extends SSTableWriter
IndexWriter(long keyCount)
{
indexFile = new SequentialWriter(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), writerOption);
- builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false);
+ builder = new FileHandle.Builder(descriptor.filenameFor(Component.PRIMARY_INDEX)).mmapped(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap);
+ chunkCache.ifPresent(builder::withChunkCache);
summary = new IndexSummaryBuilder(keyCount, metadata.params.minIndexInterval, Downsampling.BASE_SAMPLING_LEVEL);
bf = FilterFactory.getFilter(keyCount, metadata.params.bloomFilterFpChance, true, descriptor.version.hasOldBfHashOrder());
// register listeners to be alerted when the data files are flushed
@@ -507,7 +524,7 @@ public class BigTableWriter extends SSTableWriter
summary.prepareToCommit();
try (IndexSummary indexSummary = summary.build(getPartitioner()))
{
- SSTableReader.saveSummary(descriptor, first, last, builder, dbuilder, indexSummary);
+ SSTableReader.saveSummary(descriptor, first, last, indexSummary);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/AbstractReaderFileProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/AbstractReaderFileProxy.java b/src/java/org/apache/cassandra/io/util/AbstractReaderFileProxy.java
index 5dc0d37..7962c0f 100644
--- a/src/java/org/apache/cassandra/io/util/AbstractReaderFileProxy.java
+++ b/src/java/org/apache/cassandra/io/util/AbstractReaderFileProxy.java
@@ -23,7 +23,7 @@ public abstract class AbstractReaderFileProxy implements ReaderFileProxy
protected final ChannelProxy channel;
protected final long fileLength;
- public AbstractReaderFileProxy(ChannelProxy channel, long fileLength)
+ protected AbstractReaderFileProxy(ChannelProxy channel, long fileLength)
{
this.channel = channel;
this.fileLength = fileLength >= 0 ? fileLength : channel.size();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java b/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java
index 95af31f..1648bcf 100644
--- a/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java
+++ b/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java
@@ -21,6 +21,7 @@
package org.apache.cassandra.io.util;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import org.apache.cassandra.utils.memory.BufferPool;
@@ -36,19 +37,12 @@ public abstract class BufferManagingRebufferer implements Rebufferer, Rebufferer
protected final ByteBuffer buffer;
protected long offset = 0;
- public static BufferManagingRebufferer on(ChunkReader wrapped)
- {
- return wrapped.alignmentRequired()
- ? new Aligned(wrapped)
- : new Unaligned(wrapped);
- }
-
abstract long alignedPosition(long position);
- public BufferManagingRebufferer(ChunkReader wrapped)
+ protected BufferManagingRebufferer(ChunkReader wrapped)
{
this.source = wrapped;
- buffer = RandomAccessReader.allocateBuffer(wrapped.chunkSize(), wrapped.preferredBufferType());
+ buffer = BufferPool.get(wrapped.chunkSize(), wrapped.preferredBufferType()).order(ByteOrder.BIG_ENDIAN);
buffer.limit(0);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
deleted file mode 100644
index a46ec14..0000000
--- a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-import org.apache.cassandra.cache.ChunkCache;
-import org.apache.cassandra.io.compress.BufferType;
-
-public class BufferedSegmentedFile extends SegmentedFile
-{
- public BufferedSegmentedFile(ChannelProxy channel, int bufferSize, long length)
- {
- this(channel, createRebufferer(channel, length, bufferSize), length);
- }
-
- private BufferedSegmentedFile(ChannelProxy channel, RebuffererFactory rebufferer, long length)
- {
- super(new Cleanup(channel, rebufferer), channel, rebufferer, length);
- }
-
- private BufferedSegmentedFile(BufferedSegmentedFile copy)
- {
- super(copy);
- }
-
- private static RebuffererFactory createRebufferer(ChannelProxy channel, long length, int bufferSize)
- {
- return ChunkCache.maybeWrap(new SimpleChunkReader(channel, length, BufferType.OFF_HEAP, bufferSize));
- }
-
- public static class Builder extends SegmentedFile.Builder
- {
- public SegmentedFile complete(ChannelProxy channel, int bufferSize, long overrideLength)
- {
- long length = overrideLength > 0 ? overrideLength : channel.size();
- return new BufferedSegmentedFile(channel, bufferSize, length);
- }
- }
-
- public BufferedSegmentedFile sharedCopy()
- {
- return new BufferedSegmentedFile(this);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java b/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
index 25ef615..e1f795a 100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
@@ -20,101 +20,26 @@ package org.apache.cassandra.io.util;
import java.io.File;
import java.io.IOException;
-import org.apache.cassandra.io.compress.BufferType;
-import org.apache.cassandra.io.util.DataIntegrityMetadata.ChecksumValidator;
-import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.ChecksumType;
-public class ChecksummedRandomAccessReader
+public final class ChecksummedRandomAccessReader
{
- @SuppressWarnings("serial")
- public static class CorruptFileException extends RuntimeException
+ public static RandomAccessReader open(File file, File crcFile) throws IOException
{
- public final String filePath;
-
- public CorruptFileException(Exception cause, String filePath)
- {
- super(cause);
- this.filePath = filePath;
- }
- }
-
- static class ChecksummedRebufferer extends BufferManagingRebufferer
- {
- private final DataIntegrityMetadata.ChecksumValidator validator;
-
- public ChecksummedRebufferer(ChannelProxy channel, ChecksumValidator validator)
- {
- super(new SimpleChunkReader(channel, channel.size(), BufferType.ON_HEAP, validator.chunkSize));
- this.validator = validator;
- }
-
- @Override
- public BufferHolder rebuffer(long desiredPosition)
- {
- if (desiredPosition != offset + buffer.position())
- validator.seek(desiredPosition);
-
- // align with buffer size, as checksums were computed in chunks of buffer size each.
- offset = alignedPosition(desiredPosition);
- source.readChunk(offset, buffer);
-
- try
- {
- validator.validate(ByteBufferUtil.getArray(buffer), 0, buffer.remaining());
- }
- catch (IOException e)
- {
- throw new CorruptFileException(e, channel().filePath());
- }
-
- return this;
- }
-
- @Override
- public void close()
+ ChannelProxy channel = new ChannelProxy(file);
+ try
{
- try
- {
- source.close();
- }
- finally
- {
- validator.close();
- }
- }
-
- @Override
- long alignedPosition(long desiredPosition)
- {
- return (desiredPosition / buffer.capacity()) * buffer.capacity();
- }
- }
-
- public static final class Builder extends RandomAccessReader.Builder
- {
- private final DataIntegrityMetadata.ChecksumValidator validator;
-
- @SuppressWarnings("resource")
- public Builder(File file, File crcFile) throws IOException
- {
- super(new ChannelProxy(file));
- this.validator = new DataIntegrityMetadata.ChecksumValidator(ChecksumType.CRC32,
- RandomAccessReader.open(crcFile),
- file.getPath());
- }
-
- @Override
- protected Rebufferer createRebufferer()
- {
- return new ChecksummedRebufferer(channel, validator);
+ DataIntegrityMetadata.ChecksumValidator validator = new DataIntegrityMetadata.ChecksumValidator(ChecksumType.CRC32,
+ RandomAccessReader.open(crcFile),
+ file.getPath());
+ Rebufferer rebufferer = new ChecksummedRebufferer(channel, validator);
+ // Always own and close the channel.
+ return new RandomAccessReader.RandomAccessReaderWithOwnChannel(rebufferer);
}
-
- @Override
- public RandomAccessReader build()
+ catch (Throwable t)
{
- // Always own and close the channel.
- return buildWithChannel();
+ channel.close();
+ throw t;
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/ChecksummedRebufferer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedRebufferer.java b/src/java/org/apache/cassandra/io/util/ChecksummedRebufferer.java
new file mode 100644
index 0000000..bc8695f
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedRebufferer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+class ChecksummedRebufferer extends BufferManagingRebufferer
+{
+ private final DataIntegrityMetadata.ChecksumValidator validator;
+
+ ChecksummedRebufferer(ChannelProxy channel, DataIntegrityMetadata.ChecksumValidator validator)
+ {
+ super(new SimpleChunkReader(channel, channel.size(), BufferType.ON_HEAP, validator.chunkSize));
+ this.validator = validator;
+ }
+
+ @Override
+ public BufferHolder rebuffer(long desiredPosition)
+ {
+ if (desiredPosition != offset + buffer.position())
+ validator.seek(desiredPosition);
+
+ // align with buffer size, as checksums were computed in chunks of buffer size each.
+ offset = alignedPosition(desiredPosition);
+ source.readChunk(offset, buffer);
+
+ try
+ {
+ validator.validate(ByteBufferUtil.getArray(buffer), 0, buffer.remaining());
+ }
+ catch (IOException e)
+ {
+ throw new CorruptFileException(e, channel().filePath());
+ }
+
+ return this;
+ }
+
+ @Override
+ public void close()
+ {
+ try
+ {
+ source.close();
+ }
+ finally
+ {
+ validator.close();
+ }
+ }
+
+ @Override
+ long alignedPosition(long desiredPosition)
+ {
+ return (desiredPosition / buffer.capacity()) * buffer.capacity();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/ChunkReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChunkReader.java b/src/java/org/apache/cassandra/io/util/ChunkReader.java
index a04299a..1d3439e 100644
--- a/src/java/org/apache/cassandra/io/util/ChunkReader.java
+++ b/src/java/org/apache/cassandra/io/util/ChunkReader.java
@@ -44,11 +44,6 @@ public interface ChunkReader extends RebuffererFactory
int chunkSize();
/**
- * If true, positions passed to this rebufferer must be aligned to chunkSize.
- */
- boolean alignmentRequired();
-
- /**
* Specifies type of buffer the caller should attempt to give.
* This is not guaranteed to be fulfilled.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java b/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java
new file mode 100644
index 0000000..5f8751a
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ThreadLocalRandom;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
+
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.compress.CorruptBlockException;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+
+public abstract class CompressedChunkReader extends AbstractReaderFileProxy implements ChunkReader
+{
+ final CompressionMetadata metadata;
+
+ protected CompressedChunkReader(ChannelProxy channel, CompressionMetadata metadata)
+ {
+ super(channel, metadata.dataLength);
+ this.metadata = metadata;
+ assert Integer.bitCount(metadata.chunkLength()) == 1; //must be a power of two
+ }
+
+ @VisibleForTesting
+ public double getCrcCheckChance()
+ {
+ return metadata.parameters.getCrcCheckChance();
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("CompressedChunkReader.%s(%s - %s, chunk length %d, data length %d)",
+ getClass().getSimpleName(),
+ channel.filePath(),
+ metadata.compressor().getClass().getSimpleName(),
+ metadata.chunkLength(),
+ metadata.dataLength);
+ }
+
+ @Override
+ public int chunkSize()
+ {
+ return metadata.chunkLength();
+ }
+
+ @Override
+ public BufferType preferredBufferType()
+ {
+ return metadata.compressor().preferredBufferType();
+ }
+
+ @Override
+ public Rebufferer instantiateRebufferer()
+ {
+ return new BufferManagingRebufferer.Aligned(this);
+ }
+
+ public static class Standard extends CompressedChunkReader
+ {
+ // we read the raw compressed bytes into this buffer, then uncompressed them into the provided one.
+ private final ThreadLocal<ByteBuffer> compressedHolder;
+
+ public Standard(ChannelProxy channel, CompressionMetadata metadata)
+ {
+ super(channel, metadata);
+ compressedHolder = ThreadLocal.withInitial(this::allocateBuffer);
+ }
+
+ public ByteBuffer allocateBuffer()
+ {
+ return allocateBuffer(metadata.compressor().initialCompressedBufferLength(metadata.chunkLength()));
+ }
+
+ public ByteBuffer allocateBuffer(int size)
+ {
+ return metadata.compressor().preferredBufferType().allocate(size);
+ }
+
+ @Override
+ public void readChunk(long position, ByteBuffer uncompressed)
+ {
+ try
+ {
+ // accesses must always be aligned
+ assert (position & -uncompressed.capacity()) == position;
+ assert position <= fileLength;
+
+ CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
+ ByteBuffer compressed = compressedHolder.get();
+
+ if (compressed.capacity() < chunk.length)
+ {
+ compressed = allocateBuffer(chunk.length);
+ compressedHolder.set(compressed);
+ }
+ else
+ {
+ compressed.clear();
+ }
+
+ compressed.limit(chunk.length);
+ if (channel.read(compressed, chunk.offset) != chunk.length)
+ throw new CorruptBlockException(channel.filePath(), chunk);
+
+ compressed.flip();
+ uncompressed.clear();
+
+ try
+ {
+ metadata.compressor().uncompress(compressed, uncompressed);
+ }
+ catch (IOException e)
+ {
+ throw new CorruptBlockException(channel.filePath(), chunk);
+ }
+ finally
+ {
+ uncompressed.flip();
+ }
+
+ if (getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
+ {
+ compressed.rewind();
+ int checksum = (int) metadata.checksumType.of(compressed);
+
+ compressed.clear().limit(Integer.BYTES);
+ if (channel.read(compressed, chunk.offset + chunk.length) != Integer.BYTES
+ || compressed.getInt(0) != checksum)
+ throw new CorruptBlockException(channel.filePath(), chunk);
+ }
+ }
+ catch (CorruptBlockException e)
+ {
+ throw new CorruptSSTableException(e, channel.filePath());
+ }
+ }
+ }
+
+ public static class Mmap extends CompressedChunkReader
+ {
+ protected final MmappedRegions regions;
+
+ public Mmap(ChannelProxy channel, CompressionMetadata metadata, MmappedRegions regions)
+ {
+ super(channel, metadata);
+ this.regions = regions;
+ }
+
+ @Override
+ public void readChunk(long position, ByteBuffer uncompressed)
+ {
+ try
+ {
+ // accesses must always be aligned
+ assert (position & -uncompressed.capacity()) == position;
+ assert position <= fileLength;
+
+ CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
+
+ MmappedRegions.Region region = regions.floor(chunk.offset);
+ long segmentOffset = region.offset();
+ int chunkOffset = Ints.checkedCast(chunk.offset - segmentOffset);
+ ByteBuffer compressedChunk = region.buffer();
+
+ compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length);
+
+ uncompressed.clear();
+
+ try
+ {
+ metadata.compressor().uncompress(compressedChunk, uncompressed);
+ }
+ catch (IOException e)
+ {
+ throw new CorruptBlockException(channel.filePath(), chunk);
+ }
+ finally
+ {
+ uncompressed.flip();
+ }
+
+ if (getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
+ {
+ compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length);
+
+ int checksum = (int) metadata.checksumType.of(compressedChunk);
+
+ compressedChunk.limit(compressedChunk.capacity());
+ if (compressedChunk.getInt() != checksum)
+ throw new CorruptBlockException(channel.filePath(), chunk);
+ }
+ }
+ catch (CorruptBlockException e)
+ {
+ throw new CorruptSSTableException(e, channel.filePath());
+ }
+
+ }
+
+ public void close()
+ {
+ regions.closeQuietly();
+ super.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
deleted file mode 100644
index 7365d40..0000000
--- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ThreadLocalRandom;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.primitives.Ints;
-
-import org.apache.cassandra.cache.ChunkCache;
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.Config.DiskAccessMode;
-import org.apache.cassandra.io.compress.*;
-import org.apache.cassandra.io.sstable.CorruptSSTableException;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.utils.concurrent.Ref;
-
-public class CompressedSegmentedFile extends SegmentedFile implements ICompressedFile
-{
- public final CompressionMetadata metadata;
-
- public CompressedSegmentedFile(ChannelProxy channel, CompressionMetadata metadata, Config.DiskAccessMode mode)
- {
- this(channel,
- metadata,
- mode == DiskAccessMode.mmap
- ? MmappedRegions.map(channel, metadata)
- : null);
- }
-
- public CompressedSegmentedFile(ChannelProxy channel, CompressionMetadata metadata, MmappedRegions regions)
- {
- this(channel, metadata, regions, createRebufferer(channel, metadata, regions));
- }
-
- private static RebuffererFactory createRebufferer(ChannelProxy channel, CompressionMetadata metadata, MmappedRegions regions)
- {
- return ChunkCache.maybeWrap(chunkReader(channel, metadata, regions));
- }
-
- public static ChunkReader chunkReader(ChannelProxy channel, CompressionMetadata metadata, MmappedRegions regions)
- {
- return regions != null
- ? new Mmap(channel, metadata, regions)
- : new Standard(channel, metadata);
- }
-
- public CompressedSegmentedFile(ChannelProxy channel, CompressionMetadata metadata, MmappedRegions regions, RebuffererFactory rebufferer)
- {
- super(new Cleanup(channel, metadata, regions, rebufferer), channel, rebufferer, metadata.compressedFileLength);
- this.metadata = metadata;
- }
-
- private CompressedSegmentedFile(CompressedSegmentedFile copy)
- {
- super(copy);
- this.metadata = copy.metadata;
- }
-
- public ChannelProxy channel()
- {
- return channel;
- }
-
- private static final class Cleanup extends SegmentedFile.Cleanup
- {
- final CompressionMetadata metadata;
-
- protected Cleanup(ChannelProxy channel, CompressionMetadata metadata, MmappedRegions regions, ReaderFileProxy rebufferer)
- {
- super(channel, rebufferer);
- this.metadata = metadata;
- }
- public void tidy()
- {
- if (ChunkCache.instance != null)
- {
- ChunkCache.instance.invalidateFile(name());
- }
- metadata.close();
-
- super.tidy();
- }
- }
-
- public CompressedSegmentedFile sharedCopy()
- {
- return new CompressedSegmentedFile(this);
- }
-
- public void addTo(Ref.IdentityCollection identities)
- {
- super.addTo(identities);
- metadata.addTo(identities);
- }
-
- public static class Builder extends SegmentedFile.Builder
- {
- final CompressedSequentialWriter writer;
- final Config.DiskAccessMode mode;
-
- public Builder(CompressedSequentialWriter writer)
- {
- this.writer = writer;
- this.mode = DatabaseDescriptor.getDiskAccessMode();
- }
-
- protected CompressionMetadata metadata(String path, long overrideLength)
- {
- if (writer == null)
- return CompressionMetadata.create(path);
-
- return writer.open(overrideLength);
- }
-
- public SegmentedFile complete(ChannelProxy channel, int bufferSize, long overrideLength)
- {
- return new CompressedSegmentedFile(channel, metadata(channel.filePath(), overrideLength), mode);
- }
- }
-
- public void dropPageCache(long before)
- {
- if (before >= metadata.dataLength)
- super.dropPageCache(0);
- super.dropPageCache(metadata.chunkFor(before).offset);
- }
-
- public CompressionMetadata getMetadata()
- {
- return metadata;
- }
-
- public long dataLength()
- {
- return metadata.dataLength;
- }
-
- @VisibleForTesting
- public abstract static class CompressedChunkReader extends AbstractReaderFileProxy implements ChunkReader
- {
- final CompressionMetadata metadata;
-
- public CompressedChunkReader(ChannelProxy channel, CompressionMetadata metadata)
- {
- super(channel, metadata.dataLength);
- this.metadata = metadata;
- assert Integer.bitCount(metadata.chunkLength()) == 1; //must be a power of two
- }
-
- @VisibleForTesting
- public double getCrcCheckChance()
- {
- return metadata.parameters.getCrcCheckChance();
- }
-
- @Override
- public String toString()
- {
- return String.format("CompressedChunkReader.%s(%s - %s, chunk length %d, data length %d)",
- getClass().getSimpleName(),
- channel.filePath(),
- metadata.compressor().getClass().getSimpleName(),
- metadata.chunkLength(),
- metadata.dataLength);
- }
-
- @Override
- public int chunkSize()
- {
- return metadata.chunkLength();
- }
-
- @Override
- public boolean alignmentRequired()
- {
- return true;
- }
-
- @Override
- public BufferType preferredBufferType()
- {
- return metadata.compressor().preferredBufferType();
- }
-
- @Override
- public Rebufferer instantiateRebufferer()
- {
- return BufferManagingRebufferer.on(this);
- }
- }
-
- static class Standard extends CompressedChunkReader
- {
- // we read the raw compressed bytes into this buffer, then uncompressed them into the provided one.
- private final ThreadLocal<ByteBuffer> compressedHolder;
-
- public Standard(ChannelProxy channel, CompressionMetadata metadata)
- {
- super(channel, metadata);
- compressedHolder = ThreadLocal.withInitial(this::allocateBuffer);
- }
-
- public ByteBuffer allocateBuffer()
- {
- return allocateBuffer(metadata.compressor().initialCompressedBufferLength(metadata.chunkLength()));
- }
-
- public ByteBuffer allocateBuffer(int size)
- {
- return metadata.compressor().preferredBufferType().allocate(size);
- }
-
- @Override
- public void readChunk(long position, ByteBuffer uncompressed)
- {
- try
- {
- // accesses must always be aligned
- assert (position & -uncompressed.capacity()) == position;
- assert position <= fileLength;
-
- CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
- ByteBuffer compressed = compressedHolder.get();
-
- if (compressed.capacity() < chunk.length)
- {
- compressed = allocateBuffer(chunk.length);
- compressedHolder.set(compressed);
- }
- else
- {
- compressed.clear();
- }
-
- compressed.limit(chunk.length);
- if (channel.read(compressed, chunk.offset) != chunk.length)
- throw new CorruptBlockException(channel.filePath(), chunk);
-
- compressed.flip();
- uncompressed.clear();
-
- try
- {
- metadata.compressor().uncompress(compressed, uncompressed);
- }
- catch (IOException e)
- {
- throw new CorruptBlockException(channel.filePath(), chunk);
- }
- finally
- {
- uncompressed.flip();
- }
-
- if (getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
- {
- compressed.rewind();
- int checksum = (int) metadata.checksumType.of(compressed);
-
- compressed.clear().limit(Integer.BYTES);
- if (channel.read(compressed, chunk.offset + chunk.length) != Integer.BYTES
- || compressed.getInt(0) != checksum)
- throw new CorruptBlockException(channel.filePath(), chunk);
- }
- }
- catch (CorruptBlockException e)
- {
- throw new CorruptSSTableException(e, channel.filePath());
- }
- }
- }
-
- static class Mmap extends CompressedChunkReader
- {
- protected final MmappedRegions regions;
-
- public Mmap(ChannelProxy channel, CompressionMetadata metadata, MmappedRegions regions)
- {
- super(channel, metadata);
- this.regions = regions;
- }
-
- @Override
- public void readChunk(long position, ByteBuffer uncompressed)
- {
- try
- {
- // accesses must always be aligned
- assert (position & -uncompressed.capacity()) == position;
- assert position <= fileLength;
-
- CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
-
- MmappedRegions.Region region = regions.floor(chunk.offset);
- long segmentOffset = region.offset();
- int chunkOffset = Ints.checkedCast(chunk.offset - segmentOffset);
- ByteBuffer compressedChunk = region.buffer();
-
- compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length);
-
- uncompressed.clear();
-
- try
- {
- metadata.compressor().uncompress(compressedChunk, uncompressed);
- }
- catch (IOException e)
- {
- throw new CorruptBlockException(channel.filePath(), chunk);
- }
- finally
- {
- uncompressed.flip();
- }
-
- if (getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
- {
- compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length);
-
- int checksum = (int) metadata.checksumType.of(compressedChunk);
-
- compressedChunk.limit(compressedChunk.capacity());
- if (compressedChunk.getInt() != checksum)
- throw new CorruptBlockException(channel.filePath(), chunk);
- }
- }
- catch (CorruptBlockException e)
- {
- throw new CorruptSSTableException(e, channel.filePath());
- }
-
- }
-
- public void close()
- {
- regions.closeQuietly();
- super.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/CorruptFileException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CorruptFileException.java b/src/java/org/apache/cassandra/io/util/CorruptFileException.java
new file mode 100644
index 0000000..875d06f
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/CorruptFileException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+@SuppressWarnings("serial")
+public class CorruptFileException extends RuntimeException
+{
+ public final String filePath;
+
+ public CorruptFileException(Exception cause, String filePath)
+ {
+ super(cause);
+ this.filePath = filePath;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/DiskOptimizationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DiskOptimizationStrategy.java b/src/java/org/apache/cassandra/io/util/DiskOptimizationStrategy.java
new file mode 100644
index 0000000..e10342d
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/DiskOptimizationStrategy.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+public interface DiskOptimizationStrategy
+{
+ // The maximum buffer size, we will never buffer more than this size. Further,
+ // when the limiter is not null, i.e. when throttling is enabled, we read exactly
+ // this size, since when throttling the intention is to eventually read everything,
+ // see CASSANDRA-8630
+ // NOTE: this size is chosen both for historical consistency, as a reasonable upper bound,
+ // and because our BufferPool currently has a maximum allocation size of this.
+ int MAX_BUFFER_SIZE = 1 << 16; // 64k
+
+ /**
+ * @param recordSize record size
+ * @return the buffer size for a given record size.
+ */
+ int bufferSize(long recordSize);
+
+ /**
+ * Round up to the next multiple of 4k but no more than {@link #MAX_BUFFER_SIZE}.
+ */
+ default int roundBufferSize(long size)
+ {
+ if (size <= 0)
+ return 4096;
+
+ size = (size + 4095) & ~4095;
+ return (int)Math.min(size, MAX_BUFFER_SIZE);
+ }
+}