You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:40 UTC
[16/51] [partial] cassandra git commit: Storage engine refactor,
a.k.a CASSANDRA-8099
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 1458461..a38e60f 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -41,10 +41,9 @@ import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.*;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.filter.ColumnSlice;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.lifecycle.Tracker;
import org.apache.cassandra.dht.*;
@@ -199,6 +198,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
// not final since we need to be able to change level on a file.
protected volatile StatsMetadata sstableMetadata;
+ public final SerializationHeader header;
+
protected final AtomicLong keyCacheHit = new AtomicLong(0);
protected final AtomicLong keyCacheRequest = new AtomicLong(0);
@@ -331,7 +332,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
String parentName = descriptor.cfname.substring(0, i);
CFMetaData parent = Schema.instance.getCFMetaData(descriptor.ksname, parentName);
ColumnDefinition def = parent.getColumnDefinitionForIndex(descriptor.cfname.substring(i + 1));
- metadata = CFMetaData.newIndexMetadata(parent, def, SecondaryIndex.getIndexComparator(parent, def));
+ metadata = SecondaryIndex.newIndexMetadata(parent, def);
}
else
{
@@ -375,10 +376,12 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
assert components.contains(Component.DATA) : "Data component is missing for sstable " + descriptor;
assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
- Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
- EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
+ EnumSet<MetadataType> types = EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS, MetadataType.HEADER);
+ Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor, types);
+
ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
+ SerializationHeader.Component header = (SerializationHeader.Component) sstableMetadata.get(MetadataType.HEADER);
// Check if sstable is created using same partitioner.
// Partitioner can be null, which indicates older version of sstable or no stats available.
@@ -392,8 +395,14 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
}
logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
- SSTableReader sstable = internalOpen(descriptor, components, metadata, partitioner, System.currentTimeMillis(),
- statsMetadata, OpenReason.NORMAL);
+ SSTableReader sstable = internalOpen(descriptor,
+ components,
+ metadata,
+ partitioner,
+ System.currentTimeMillis(),
+ statsMetadata,
+ OpenReason.NORMAL,
+ header.toHeader(metadata));
// special implementation of load to use non-pooled SegmentedFile builders
try(SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
@@ -421,10 +430,15 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
assert components.contains(Component.DATA) : "Data component is missing for sstable " + descriptor;
assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
- Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
- EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
+ // For the 3.0+ sstable format, the (misnomed) stats component hold the serialization header which we need to deserialize the sstable content
+ assert !descriptor.version.storeRows() || components.contains(Component.STATS) : "Stats component is missing for sstable " + descriptor;
+
+ EnumSet<MetadataType> types = EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS, MetadataType.HEADER);
+ Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor, types);
ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
+ SerializationHeader.Component header = (SerializationHeader.Component) sstableMetadata.get(MetadataType.HEADER);
+ assert !descriptor.version.storeRows() || header != null;
// Check if sstable is created using same partitioner.
// Partitioner can be null, which indicates older version of sstable or no stats available.
@@ -438,8 +452,14 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
}
logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
- SSTableReader sstable = internalOpen(descriptor, components, metadata, partitioner, System.currentTimeMillis(),
- statsMetadata, OpenReason.NORMAL);
+ SSTableReader sstable = internalOpen(descriptor,
+ components,
+ metadata,
+ partitioner,
+ System.currentTimeMillis(),
+ statsMetadata,
+ OpenReason.NORMAL,
+ header == null ? null : header.toHeader(metadata));
// load index and filter
long start = System.nanoTime();
@@ -520,11 +540,12 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
IFilter bf,
long maxDataAge,
StatsMetadata sstableMetadata,
- OpenReason openReason)
+ OpenReason openReason,
+ SerializationHeader header)
{
assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null;
- SSTableReader reader = internalOpen(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
+ SSTableReader reader = internalOpen(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header);
reader.bf = bf;
reader.ifile = ifile;
@@ -542,11 +563,12 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
IPartitioner partitioner,
Long maxDataAge,
StatsMetadata sstableMetadata,
- OpenReason openReason)
+ OpenReason openReason,
+ SerializationHeader header)
{
Factory readerFactory = descriptor.getFormat().getReaderFactory();
- return readerFactory.open(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
+ return readerFactory.open(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header);
}
protected SSTableReader(final Descriptor desc,
@@ -555,13 +577,15 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
IPartitioner partitioner,
long maxDataAge,
StatsMetadata sstableMetadata,
- OpenReason openReason)
+ OpenReason openReason,
+ SerializationHeader header)
{
super(desc, components, metadata, partitioner);
this.sstableMetadata = sstableMetadata;
+ this.header = header;
this.maxDataAge = maxDataAge;
this.openReason = openReason;
- this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata);
+ this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata, desc.version, header);
}
public static long getTotalBytes(Iterable<SSTableReader> sstables)
@@ -736,12 +760,12 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
try (IndexSummaryBuilder summaryBuilder = summaryLoaded ? null : new IndexSummaryBuilder(estimatedKeys, metadata.getMinIndexInterval(), samplingLevel))
{
long indexPosition;
- RowIndexEntry.IndexSerializer rowIndexSerializer = descriptor.getFormat().getIndexSerializer(metadata);
+ RowIndexEntry.IndexSerializer rowIndexSerializer = descriptor.getFormat().getIndexSerializer(metadata, descriptor.version, header);
while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
{
ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
- RowIndexEntry indexEntry = rowIndexSerializer.deserialize(primaryIndex, descriptor.version);
+ RowIndexEntry indexEntry = rowIndexSerializer.deserialize(primaryIndex);
DecoratedKey decoratedKey = partitioner.decorateKey(key);
if (first == null)
first = decoratedKey;
@@ -979,7 +1003,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
bf.sharedCopy(),
maxDataAge,
sstableMetadata,
- reason);
+ reason,
+ header);
replacement.first = newFirst;
replacement.last = last;
replacement.isSuspect.set(isSuspect.get());
@@ -1140,7 +1165,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
* Gets the position in the index file to start scanning to find the given key (at most indexInterval keys away,
* modulo downsampling of the index summary). Always returns a value >= 0
*/
- public long getIndexScanPosition(RowPosition key)
+ public long getIndexScanPosition(PartitionPosition key)
{
if (openReason == OpenReason.MOVED_START && key.compareTo(first) < 0)
key = first;
@@ -1287,8 +1312,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
for (Range<Token> range : Range.normalize(ranges))
{
- RowPosition leftPosition = range.left.maxKeyBound();
- RowPosition rightPosition = range.right.maxKeyBound();
+ PartitionPosition leftPosition = range.left.maxKeyBound();
+ PartitionPosition rightPosition = range.right.maxKeyBound();
int left = summary.binarySearch(leftPosition);
if (left < 0)
@@ -1382,9 +1407,9 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
{
assert !range.isWrapAround() || range.right.isMinimum();
// truncate the range so it at most covers the sstable
- AbstractBounds<RowPosition> bounds = Range.makeRowRange(range);
- RowPosition leftBound = bounds.left.compareTo(first) > 0 ? bounds.left : first.getToken().minKeyBound();
- RowPosition rightBound = bounds.right.isMinimum() ? last.getToken().maxKeyBound() : bounds.right;
+ AbstractBounds<PartitionPosition> bounds = Range.makeRowRange(range);
+ PartitionPosition leftBound = bounds.left.compareTo(first) > 0 ? bounds.left : first.getToken().minKeyBound();
+ PartitionPosition rightBound = bounds.right.isMinimum() ? last.getToken().maxKeyBound() : bounds.right;
if (leftBound.compareTo(last) > 0 || rightBound.compareTo(first) < 0)
continue;
@@ -1455,14 +1480,14 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
/**
* Get position updating key cache and stats.
- * @see #getPosition(org.apache.cassandra.db.RowPosition, SSTableReader.Operator, boolean)
+ * @see #getPosition(PartitionPosition, SSTableReader.Operator, boolean)
*/
- public RowIndexEntry getPosition(RowPosition key, Operator op)
+ public RowIndexEntry getPosition(PartitionPosition key, Operator op)
{
return getPosition(key, op, true, false);
}
- public RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats)
+ public RowIndexEntry getPosition(PartitionPosition key, Operator op, boolean updateCacheAndStats)
{
return getPosition(key, op, updateCacheAndStats, false);
}
@@ -1473,20 +1498,15 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
* @param updateCacheAndStats true if updating stats and cache
* @return The index entry corresponding to the key, or null if the key is not present
*/
- protected abstract RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats, boolean permitMatchPastLast);
+ protected abstract RowIndexEntry getPosition(PartitionPosition key, Operator op, boolean updateCacheAndStats, boolean permitMatchPastLast);
- //Corresponds to a name column
- public abstract OnDiskAtomIterator iterator(DecoratedKey key, SortedSet<CellName> columns);
- public abstract OnDiskAtomIterator iterator(FileDataInput file, DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry);
-
- //Corresponds to a slice query
- public abstract OnDiskAtomIterator iterator(DecoratedKey key, ColumnSlice[] slices, boolean reverse);
- public abstract OnDiskAtomIterator iterator(FileDataInput file, DecoratedKey key, ColumnSlice[] slices, boolean reversed, RowIndexEntry indexEntry);
+ public abstract SliceableUnfilteredRowIterator iterator(DecoratedKey key, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift);
+ public abstract SliceableUnfilteredRowIterator iterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift);
/**
* Finds and returns the first key beyond a given token in this SSTable or null if no such key exists.
*/
- public DecoratedKey firstKeyBeyond(RowPosition token)
+ public DecoratedKey firstKeyBeyond(PartitionPosition token)
{
if (token.compareTo(first) < 0)
return first;
@@ -1589,19 +1609,14 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
return getScanner((RateLimiter) null);
}
- public ISSTableScanner getScanner(RateLimiter limiter)
- {
- return getScanner(DataRange.allData(partitioner), limiter);
- }
-
/**
- *
+ * @param columns the columns to return.
* @param dataRange filter to use when reading the columns
* @return A Scanner for seeking over the rows of the SSTable.
*/
- public ISSTableScanner getScanner(DataRange dataRange)
+ public ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange, boolean isForThrift)
{
- return getScanner(dataRange, null);
+ return getScanner(columns, dataRange, null, isForThrift);
}
/**
@@ -1618,6 +1633,13 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
}
/**
+ * Direct I/O SSTableScanner over the entirety of the sstable..
+ *
+ * @return A Scanner over the full content of the SSTable.
+ */
+ public abstract ISSTableScanner getScanner(RateLimiter limiter);
+
+ /**
* Direct I/O SSTableScanner over a defined collection of ranges of tokens.
*
* @param ranges the range of keys to cover
@@ -1626,11 +1648,11 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
public abstract ISSTableScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter);
/**
- *
+ * @param columns the columns to return.
* @param dataRange filter to use when reading the columns
* @return A Scanner for seeking over the rows of the SSTable.
*/
- public abstract ISSTableScanner getScanner(DataRange dataRange, RateLimiter limiter);
+ public abstract ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange, RateLimiter limiter, boolean isForThrift);
@@ -1761,6 +1783,43 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
return sstableMetadata.maxTimestamp;
}
+ public int getMinLocalDeletionTime()
+ {
+ return sstableMetadata.minLocalDeletionTime;
+ }
+
+ public int getMaxLocalDeletionTime()
+ {
+ return sstableMetadata.maxLocalDeletionTime;
+ }
+
+ public int getMinTTL()
+ {
+ return sstableMetadata.minTTL;
+ }
+
+ public int getMaxTTL()
+ {
+ return sstableMetadata.maxTTL;
+ }
+
+ public long getTotalColumnsSet()
+ {
+ return sstableMetadata.totalColumnsSet;
+ }
+
+ public long getTotalRows()
+ {
+ return sstableMetadata.totalRows;
+ }
+
+ public int getAvgColumnSetPerRow()
+ {
+ return sstableMetadata.totalRows < 0
+ ? -1
+ : (sstableMetadata.totalRows == 0 ? 0 : (int)(sstableMetadata.totalColumnsSet / sstableMetadata.totalRows));
+ }
+
public Set<Integer> getAncestors()
{
try
@@ -1850,6 +1909,34 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
readMeter.mark();
}
+ /**
+ * Checks if this sstable can overlap with another one based on the min/man clustering values.
+ * If this methods return false, we're guarantee that {@code this} and {@code other} have no overlapping
+ * data, i.e. no cells to reconcile.
+ */
+ public boolean mayOverlapsWith(SSTableReader other)
+ {
+ StatsMetadata m1 = getSSTableMetadata();
+ StatsMetadata m2 = other.getSSTableMetadata();
+
+ if (m1.minClusteringValues.isEmpty() || m1.maxClusteringValues.isEmpty() || m2.minClusteringValues.isEmpty() || m2.maxClusteringValues.isEmpty())
+ return true;
+
+ return !(compare(m1.maxClusteringValues, m2.minClusteringValues) < 0 || compare(m1.minClusteringValues, m2.maxClusteringValues) > 0);
+ }
+
+ private int compare(List<ByteBuffer> values1, List<ByteBuffer> values2)
+ {
+ ClusteringComparator comparator = metadata.comparator;
+ for (int i = 0; i < Math.min(values1.size(), values2.size()); i++)
+ {
+ int cmp = comparator.subtype(i).compare(values1.get(i), values2.get(i));
+ if (cmp != 0)
+ return cmp;
+ }
+ return 0;
+ }
+
public static class SizeComparator implements Comparator<SSTableReader>
{
public int compare(SSTableReader o1, SSTableReader o2)
@@ -2172,7 +2259,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
IPartitioner partitioner,
Long maxDataAge,
StatsMetadata sstableMetadata,
- OpenReason openReason);
+ OpenReason openReason,
+ SerializationHeader header);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index f99292e..c3c69b3 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -18,14 +18,19 @@
package org.apache.cassandra.io.sstable.format;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
import com.google.common.collect.Sets;
+
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowIndexEntry;
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
@@ -37,13 +42,6 @@ import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.concurrent.Transactional;
-import java.io.DataInput;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
/**
* This is the API all table writers must implement.
*
@@ -57,6 +55,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
protected final long keyCount;
protected final MetadataCollector metadataCollector;
protected final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
+ protected final SerializationHeader header;
protected final TransactionalProxy txnProxy = txnProxy();
protected abstract TransactionalProxy txnProxy();
@@ -69,30 +68,37 @@ public abstract class SSTableWriter extends SSTable implements Transactional
protected boolean openResult;
}
- protected SSTableWriter(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector)
+ protected SSTableWriter(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector, SerializationHeader header)
{
super(descriptor, components(metadata), metadata, partitioner);
this.keyCount = keyCount;
this.repairedAt = repairedAt;
this.metadataCollector = metadataCollector;
- this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata);
+ this.header = header;
+ this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata, descriptor.version, header);
}
- public static SSTableWriter create(Descriptor descriptor, Long keyCount, Long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector)
+ public static SSTableWriter create(Descriptor descriptor,
+ Long keyCount,
+ Long repairedAt,
+ CFMetaData metadata,
+ IPartitioner partitioner,
+ MetadataCollector metadataCollector,
+ SerializationHeader header)
{
Factory writerFactory = descriptor.getFormat().getWriterFactory();
- return writerFactory.open(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector);
+ return writerFactory.open(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector, header);
}
- public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt)
+ public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, SerializationHeader header)
{
- return create(descriptor, keyCount, repairedAt, 0);
+ return create(descriptor, keyCount, repairedAt, 0, header);
}
- public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel)
+ public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
{
CFMetaData metadata = Schema.instance.getCFMetaData(descriptor);
- return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, DatabaseDescriptor.getPartitioner());
+ return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, DatabaseDescriptor.getPartitioner(), header);
}
public static SSTableWriter create(CFMetaData metadata,
@@ -100,20 +106,21 @@ public abstract class SSTableWriter extends SSTable implements Transactional
long keyCount,
long repairedAt,
int sstableLevel,
- IPartitioner partitioner)
+ IPartitioner partitioner,
+ SerializationHeader header)
{
MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel);
- return create(descriptor, keyCount, repairedAt, metadata, partitioner, collector);
+ return create(descriptor, keyCount, repairedAt, metadata, partitioner, collector, header);
}
- public static SSTableWriter create(String filename, long keyCount, long repairedAt, int sstableLevel)
+ public static SSTableWriter create(String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
{
- return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel);
+ return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel, header);
}
- public static SSTableWriter create(String filename, long keyCount, long repairedAt)
+ public static SSTableWriter create(String filename, long keyCount, long repairedAt, SerializationHeader header)
{
- return create(Descriptor.fromFilename(filename), keyCount, repairedAt, 0);
+ return create(Descriptor.fromFilename(filename), keyCount, repairedAt, 0, header);
}
private static Set<Component> components(CFMetaData metadata)
@@ -141,19 +148,18 @@ public abstract class SSTableWriter extends SSTable implements Transactional
return components;
}
-
public abstract void mark();
-
/**
- * @param row
- * @return null if the row was compacted away entirely; otherwise, the PK index entry for this row
+ * Appends partition data to this writer.
+ *
+ * @param iterator the partition to write
+ * @return the created index entry if something was written, that is if {@code iterator}
+ * wasn't empty, {@code null} otherwise.
+ *
+ * @throws FSWriteError if a write to the dataFile fails
*/
- public abstract RowIndexEntry append(AbstractCompactedRow row);
-
- public abstract void append(DecoratedKey decoratedKey, ColumnFamily cf);
-
- public abstract long appendFromStream(DecoratedKey key, CFMetaData metadata, DataInput in, Version version) throws IOException;
+ public abstract RowIndexEntry append(UnfilteredRowIterator iterator);
public abstract long getFilePointer();
@@ -244,7 +250,9 @@ public abstract class SSTableWriter extends SSTable implements Transactional
protected Map<MetadataType, MetadataComponent> finalizeMetadata()
{
return metadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
- metadata.getBloomFilterFpChance(), repairedAt);
+ metadata.getBloomFilterFpChance(),
+ repairedAt,
+ header);
}
protected StatsMetadata statsMetadata()
@@ -276,6 +284,12 @@ public abstract class SSTableWriter extends SSTable implements Transactional
public static abstract class Factory
{
- public abstract SSTableWriter open(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector);
+ public abstract SSTableWriter open(Descriptor descriptor,
+ long keyCount,
+ long repairedAt,
+ CFMetaData metadata,
+ IPartitioner partitioner,
+ MetadataCollector metadataCollector,
+ SerializationHeader header);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/Version.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/Version.java b/src/java/org/apache/cassandra/io/sstable/format/Version.java
index faaa89e..8077a45 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/Version.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/Version.java
@@ -52,6 +52,10 @@ public abstract class Version
public abstract boolean hasNewFileName();
+ public abstract boolean storeRows();
+
+ public abstract int correspondingMessagingVersion(); // Only use by storage that 'storeRows' so far
+
public String getVersion()
{
return version;
@@ -73,6 +77,7 @@ public abstract class Version
}
abstract public boolean isCompatible();
+ abstract public boolean isCompatibleForStreaming();
@Override
public String toString()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index a1e32cf..fd0b5d5 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -17,16 +17,14 @@
*/
package org.apache.cassandra.io.sstable.format.big;
+import java.util.Iterator;
+import java.util.Set;
+
import com.google.common.collect.ImmutableList;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.AbstractCell;
-import org.apache.cassandra.db.ColumnSerializer;
-import org.apache.cassandra.db.OnDiskAtom;
import org.apache.cassandra.db.RowIndexEntry;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.compaction.CompactionController;
-import org.apache.cassandra.db.compaction.LazilyCompactedRow;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
@@ -38,9 +36,7 @@ import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.io.util.FileDataInput;
-
-import java.util.Iterator;
-import java.util.Set;
+import org.apache.cassandra.net.MessagingService;
/**
* Legacy bigtable format
@@ -82,38 +78,26 @@ public class BigFormat implements SSTableFormat
}
@Override
- public Iterator<OnDiskAtom> getOnDiskIterator(FileDataInput in, ColumnSerializer.Flag flag, int expireBefore, CFMetaData cfm, Version version)
- {
- return AbstractCell.onDiskIterator(in, flag, expireBefore, version, cfm.comparator);
- }
-
- @Override
- public AbstractCompactedRow getCompactedRowWriter(CompactionController controller, ImmutableList<OnDiskAtomIterator> onDiskAtomIterators)
- {
- return new LazilyCompactedRow(controller, onDiskAtomIterators);
- }
-
- @Override
- public RowIndexEntry.IndexSerializer getIndexSerializer(CFMetaData cfMetaData)
+ public RowIndexEntry.IndexSerializer getIndexSerializer(CFMetaData metadata, Version version, SerializationHeader header)
{
- return new RowIndexEntry.Serializer(new IndexHelper.IndexInfo.Serializer(cfMetaData.comparator));
+ return new RowIndexEntry.Serializer(metadata, version, header);
}
static class WriterFactory extends SSTableWriter.Factory
{
@Override
- public SSTableWriter open(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector)
+ public SSTableWriter open(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector, SerializationHeader header)
{
- return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector);
+ return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector, header);
}
}
static class ReaderFactory extends SSTableReader.Factory
{
@Override
- public SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, SSTableReader.OpenReason openReason)
+ public SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, SSTableReader.OpenReason openReason, SerializationHeader header)
{
- return new BigTableReader(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
+ return new BigTableReader(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header);
}
}
@@ -143,6 +127,8 @@ public class BigFormat implements SSTableFormat
private final boolean hasRepairedAt;
private final boolean tracksLegacyCounterShards;
private final boolean newFileName;
+ public final boolean storeRows;
+ public final int correspondingMessagingVersion; // Only use by storage that 'storeRows' so far
public BigVersion(String version)
{
@@ -155,6 +141,8 @@ public class BigFormat implements SSTableFormat
hasRepairedAt = version.compareTo("ka") >= 0;
tracksLegacyCounterShards = version.compareTo("ka") >= 0;
newFileName = version.compareTo("la") >= 0;
+ storeRows = version.compareTo("la") >= 0;
+ correspondingMessagingVersion = storeRows ? MessagingService.VERSION_30 : MessagingService.VERSION_21;
}
@Override
@@ -200,9 +188,27 @@ public class BigFormat implements SSTableFormat
}
@Override
+ public boolean storeRows()
+ {
+ return storeRows;
+ }
+
+ @Override
+ public int correspondingMessagingVersion()
+ {
+ return correspondingMessagingVersion;
+ }
+
+ @Override
public boolean isCompatible()
{
return version.compareTo(earliest_supported_version) >= 0 && version.charAt(0) <= current_version.charAt(0);
}
+
+ @Override
+ public boolean isCompatibleForStreaming()
+ {
+ return isCompatible() && version.charAt(0) == current_version.charAt(0);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index 3f375e7..7a7b913 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@ -20,13 +20,11 @@ package org.apache.cassandra.io.sstable.format.big;
import com.google.common.util.concurrent.RateLimiter;
import org.apache.cassandra.cache.KeyCacheKey;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.DataRange;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.RowIndexEntry;
-import org.apache.cassandra.db.RowPosition;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.SliceableUnfilteredRowIterator;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.columniterator.SSTableIterator;
+import org.apache.cassandra.db.columniterator.SSTableReversedIterator;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
@@ -37,7 +35,6 @@ import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.slf4j.Logger;
@@ -55,41 +52,45 @@ public class BigTableReader extends SSTableReader
{
private static final Logger logger = LoggerFactory.getLogger(BigTableReader.class);
- BigTableReader(Descriptor desc, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason)
+ BigTableReader(Descriptor desc, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason, SerializationHeader header)
{
- super(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
+ super(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header);
}
- public OnDiskAtomIterator iterator(DecoratedKey key, SortedSet<CellName> columns)
+ public SliceableUnfilteredRowIterator iterator(DecoratedKey key, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift)
{
- return new SSTableNamesIterator(this, key, columns);
+ return reversed
+ ? new SSTableReversedIterator(this, key, selectedColumns, isForThrift)
+ : new SSTableIterator(this, key, selectedColumns, isForThrift);
}
- public OnDiskAtomIterator iterator(FileDataInput input, DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry )
+ public SliceableUnfilteredRowIterator iterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift)
{
- return new SSTableNamesIterator(this, input, key, columns, indexEntry);
+ return reversed
+ ? new SSTableReversedIterator(this, file, key, indexEntry, selectedColumns, isForThrift)
+ : new SSTableIterator(this, file, key, indexEntry, selectedColumns, isForThrift);
}
- public OnDiskAtomIterator iterator(DecoratedKey key, ColumnSlice[] slices, boolean reverse)
+ /**
+ * @param columns the columns to return.
+ * @param dataRange filter to use when reading the columns
+ * @return A Scanner for seeking over the rows of the SSTable.
+ */
+ public ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange, RateLimiter limiter, boolean isForThrift)
{
- return new SSTableSliceIterator(this, key, slices, reverse);
+ return BigTableScanner.getScanner(this, columns, dataRange, limiter, isForThrift);
}
- public OnDiskAtomIterator iterator(FileDataInput input, DecoratedKey key, ColumnSlice[] slices, boolean reverse, RowIndexEntry indexEntry)
- {
- return new SSTableSliceIterator(this, input, key, slices, reverse, indexEntry);
- }
/**
+ * Direct I/O SSTableScanner over the full sstable.
*
- * @param dataRange filter to use when reading the columns
- * @return A Scanner for seeking over the rows of the SSTable.
+ * @return A Scanner for reading the full SSTable.
*/
- public ISSTableScanner getScanner(DataRange dataRange, RateLimiter limiter)
+ public ISSTableScanner getScanner(RateLimiter limiter)
{
- return BigTableScanner.getScanner(this, dataRange, limiter);
+ return BigTableScanner.getScanner(this, limiter);
}
-
/**
* Direct I/O SSTableScanner over a defined collection of ranges of tokens.
*
@@ -109,7 +110,7 @@ public class BigTableReader extends SSTableReader
* @param updateCacheAndStats true if updating stats and cache
* @return The index entry corresponding to the key, or null if the key is not present
*/
- protected RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats, boolean permitMatchPastLast)
+ protected RowIndexEntry getPosition(PartitionPosition key, Operator op, boolean updateCacheAndStats, boolean permitMatchPastLast)
{
if (op == Operator.EQ)
{
@@ -212,7 +213,7 @@ public class BigTableReader extends SSTableReader
if (opSatisfied)
{
// read data position from index entry
- RowIndexEntry indexEntry = rowIndexEntrySerializer.deserialize(in, descriptor.version);
+ RowIndexEntry indexEntry = rowIndexEntrySerializer.deserialize(in);
if (exactMatch && updateCacheAndStats)
{
assert key instanceof DecoratedKey; // key can be == to the index key only if it's a true row key
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index d477152..0794e90 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@ -22,16 +22,13 @@ import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Ordering;
+import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.RateLimiter;
-import org.apache.cassandra.db.DataRange;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.RowIndexEntry;
-import org.apache.cassandra.db.RowPosition;
-import org.apache.cassandra.db.columniterator.IColumnIteratorFactory;
-import org.apache.cassandra.db.columniterator.LazyColumnIterator;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.AbstractBounds.Boundary;
import org.apache.cassandra.dht.Bounds;
@@ -57,18 +54,27 @@ public class BigTableScanner implements ISSTableScanner
protected final RandomAccessReader ifile;
public final SSTableReader sstable;
- private final Iterator<AbstractBounds<RowPosition>> rangeIterator;
- private AbstractBounds<RowPosition> currentRange;
+ private final Iterator<AbstractBounds<PartitionPosition>> rangeIterator;
+ private AbstractBounds<PartitionPosition> currentRange;
+ private final ColumnFilter columns;
private final DataRange dataRange;
private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
+ private final boolean isForThrift;
- protected Iterator<OnDiskAtomIterator> iterator;
+ protected UnfilteredPartitionIterator iterator;
- public static ISSTableScanner getScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
+ // Full scan of the sstables
+ public static ISSTableScanner getScanner(SSTableReader sstable, RateLimiter limiter)
{
- return new BigTableScanner(sstable, dataRange, limiter);
+ return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, limiter, false, Iterators.singletonIterator(fullRange(sstable)));
}
+
+ public static ISSTableScanner getScanner(SSTableReader sstable, ColumnFilter columns, DataRange dataRange, RateLimiter limiter, boolean isForThrift)
+ {
+ return new BigTableScanner(sstable, columns, dataRange, limiter, isForThrift, makeBounds(sstable, dataRange).iterator());
+ }
+
public static ISSTableScanner getScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
{
// We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249)
@@ -76,60 +82,54 @@ public class BigTableScanner implements ISSTableScanner
if (positions.isEmpty())
return new EmptySSTableScanner(sstable.getFilename());
- return new BigTableScanner(sstable, tokenRanges, limiter);
+ return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, limiter, false, makeBounds(sstable, tokenRanges).iterator());
}
- /**
- * @param sstable SSTable to scan; must not be null
- * @param dataRange a single range to scan; must not be null
- * @param limiter background i/o RateLimiter; may be null
- */
- private BigTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
+ private BigTableScanner(SSTableReader sstable, ColumnFilter columns, DataRange dataRange, RateLimiter limiter, boolean isForThrift, Iterator<AbstractBounds<PartitionPosition>> rangeIterator)
{
assert sstable != null;
this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
this.ifile = sstable.openIndexReader();
this.sstable = sstable;
+ this.columns = columns;
this.dataRange = dataRange;
- this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
-
- List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(2);
- addRange(dataRange.keyRange(), boundsList);
- this.rangeIterator = boundsList.iterator();
+ this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata,
+ sstable.descriptor.version,
+ sstable.header);
+ this.isForThrift = isForThrift;
+ this.rangeIterator = rangeIterator;
}
- /**
- * @param sstable SSTable to scan; must not be null
- * @param tokenRanges A set of token ranges to scan
- * @param limiter background i/o RateLimiter; may be null
- */
- private BigTableScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
+ private static List<AbstractBounds<PartitionPosition>> makeBounds(SSTableReader sstable, Collection<Range<Token>> tokenRanges)
{
- assert sstable != null;
-
- this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
- this.ifile = sstable.openIndexReader();
- this.sstable = sstable;
- this.dataRange = null;
- this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
-
- List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(tokenRanges.size());
+ List<AbstractBounds<PartitionPosition>> boundsList = new ArrayList<>(tokenRanges.size());
for (Range<Token> range : Range.normalize(tokenRanges))
- addRange(Range.makeRowRange(range), boundsList);
+ addRange(sstable, Range.makeRowRange(range), boundsList);
+ return boundsList;
+ }
- this.rangeIterator = boundsList.iterator();
+ private static List<AbstractBounds<PartitionPosition>> makeBounds(SSTableReader sstable, DataRange dataRange)
+ {
+ List<AbstractBounds<PartitionPosition>> boundsList = new ArrayList<>(2);
+ addRange(sstable, dataRange.keyRange(), boundsList);
+ return boundsList;
+ }
+
+ private static AbstractBounds<PartitionPosition> fullRange(SSTableReader sstable)
+ {
+ return new Bounds<PartitionPosition>(sstable.first, sstable.last);
}
- private void addRange(AbstractBounds<RowPosition> requested, List<AbstractBounds<RowPosition>> boundsList)
+ private static void addRange(SSTableReader sstable, AbstractBounds<PartitionPosition> requested, List<AbstractBounds<PartitionPosition>> boundsList)
{
if (requested instanceof Range && ((Range)requested).isWrapAround())
{
if (requested.right.compareTo(sstable.first) >= 0)
{
// since we wrap, we must contain the whole sstable prior to stopKey()
- Boundary<RowPosition> left = new Boundary<RowPosition>(sstable.first, true);
- Boundary<RowPosition> right;
+ Boundary<PartitionPosition> left = new Boundary<PartitionPosition>(sstable.first, true);
+ Boundary<PartitionPosition> right;
right = requested.rightBoundary();
right = minRight(right, sstable.last, true);
if (!isEmpty(left, right))
@@ -138,8 +138,8 @@ public class BigTableScanner implements ISSTableScanner
if (requested.left.compareTo(sstable.last) <= 0)
{
// since we wrap, we must contain the whole sstable after dataRange.startKey()
- Boundary<RowPosition> right = new Boundary<RowPosition>(sstable.last, true);
- Boundary<RowPosition> left;
+ Boundary<PartitionPosition> right = new Boundary<PartitionPosition>(sstable.last, true);
+ Boundary<PartitionPosition> left;
left = requested.leftBoundary();
left = maxLeft(left, sstable.first, true);
if (!isEmpty(left, right))
@@ -149,12 +149,12 @@ public class BigTableScanner implements ISSTableScanner
else
{
assert requested.left.compareTo(requested.right) <= 0 || requested.right.isMinimum();
- Boundary<RowPosition> left, right;
+ Boundary<PartitionPosition> left, right;
left = requested.leftBoundary();
right = requested.rightBoundary();
left = maxLeft(left, sstable.first, true);
// apparently isWrapAround() doesn't count Bounds that extend to the limit (min) as wrapping
- right = requested.right.isMinimum() ? new Boundary<RowPosition>(sstable.last, true)
+ right = requested.right.isMinimum() ? new Boundary<PartitionPosition>(sstable.last, true)
: minRight(right, sstable.last, true);
if (!isEmpty(left, right))
boundsList.add(AbstractBounds.bounds(left, right));
@@ -193,10 +193,18 @@ public class BigTableScanner implements ISSTableScanner
}
}
- public void close() throws IOException
+ public void close()
{
- if (isClosed.compareAndSet(false, true))
- FileUtils.close(dfile, ifile);
+ try
+ {
+ if (isClosed.compareAndSet(false, true))
+ FileUtils.close(dfile, ifile);
+ }
+ catch (IOException e)
+ {
+ sstable.markSuspect();
+ throw new CorruptSSTableException(e, sstable.getFilename());
+ }
}
public long getLengthInBytes()
@@ -214,6 +222,11 @@ public class BigTableScanner implements ISSTableScanner
return sstable.toString();
}
+ public boolean isForThrift()
+ {
+ return isForThrift;
+ }
+
public boolean hasNext()
{
if (iterator == null)
@@ -221,7 +234,7 @@ public class BigTableScanner implements ISSTableScanner
return iterator.hasNext();
}
- public OnDiskAtomIterator next()
+ public UnfilteredRowIterator next()
{
if (iterator == null)
iterator = createIterator();
@@ -233,19 +246,24 @@ public class BigTableScanner implements ISSTableScanner
throw new UnsupportedOperationException();
}
- private Iterator<OnDiskAtomIterator> createIterator()
+ private UnfilteredPartitionIterator createIterator()
{
return new KeyScanningIterator();
}
- protected class KeyScanningIterator extends AbstractIterator<OnDiskAtomIterator>
+ protected class KeyScanningIterator extends AbstractIterator<UnfilteredRowIterator> implements UnfilteredPartitionIterator
{
private DecoratedKey nextKey;
private RowIndexEntry nextEntry;
private DecoratedKey currentKey;
private RowIndexEntry currentEntry;
- protected OnDiskAtomIterator computeNext()
+ public boolean isForThrift()
+ {
+ return isForThrift;
+ }
+
+ protected UnfilteredRowIterator computeNext()
{
try
{
@@ -264,7 +282,7 @@ public class BigTableScanner implements ISSTableScanner
return endOfData();
currentKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
- currentEntry = rowIndexEntrySerializer.deserialize(ifile, sstable.descriptor.version);
+ currentEntry = rowIndexEntrySerializer.deserialize(ifile);
} while (!currentRange.contains(currentKey));
}
else
@@ -283,7 +301,7 @@ public class BigTableScanner implements ISSTableScanner
{
// we need the position of the start of the next key, regardless of whether it falls in the current range
nextKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
- nextEntry = rowIndexEntrySerializer.deserialize(ifile, sstable.descriptor.version);
+ nextEntry = rowIndexEntrySerializer.deserialize(ifile);
if (!currentRange.contains(nextKey))
{
@@ -292,21 +310,34 @@ public class BigTableScanner implements ISSTableScanner
}
}
- if (dataRange == null || dataRange.selectsFullRowFor(currentKey.getKey()))
+ /*
+ * For a given partition key, we want to avoid hitting the data
+ * file unless we're explicitely asked to. This is important
+ * for PartitionRangeReadCommand#checkCacheFilter.
+ */
+ return new LazilyInitializedUnfilteredRowIterator(currentKey)
{
- dfile.seek(currentEntry.position + currentEntry.headerOffset());
- ByteBufferUtil.readWithShortLength(dfile); // key
- return new SSTableIdentityIterator(sstable, dfile, currentKey);
- }
-
- return new LazyColumnIterator(currentKey, new IColumnIteratorFactory()
- {
- public OnDiskAtomIterator create()
+ protected UnfilteredRowIterator initializeIterator()
{
- return dataRange.columnFilter(currentKey.getKey()).getSSTableColumnIterator(sstable, dfile, currentKey, currentEntry);
+ try
+ {
+ if (dataRange == null)
+ {
+ dfile.seek(currentEntry.position + currentEntry.headerOffset());
+ ByteBufferUtil.readWithShortLength(dfile); // key
+ return new SSTableIdentityIterator(sstable, dfile, partitionKey());
+ }
+
+ ClusteringIndexFilter filter = dataRange.clusteringIndexFilter(partitionKey());
+ return filter.filter(sstable.iterator(dfile, partitionKey(), currentEntry, columns, filter.isReversed(), isForThrift));
+ }
+ catch (CorruptSSTableException | IOException e)
+ {
+ sstable.markSuspect();
+ throw new CorruptSSTableException(e, sstable.getFilename());
+ }
}
- });
-
+ };
}
catch (CorruptSSTableException | IOException e)
{
@@ -314,6 +345,11 @@ public class BigTableScanner implements ISSTableScanner
throw new CorruptSSTableException(e, sstable.getFilename());
}
}
+
+ public void close()
+ {
+ BigTableScanner.this.close();
+ }
}
@Override
@@ -326,7 +362,7 @@ public class BigTableScanner implements ISSTableScanner
")";
}
- public static class EmptySSTableScanner implements ISSTableScanner
+ public static class EmptySSTableScanner extends AbstractUnfilteredPartitionIterator implements ISSTableScanner
{
private final String filename;
@@ -350,20 +386,19 @@ public class BigTableScanner implements ISSTableScanner
return filename;
}
+ public boolean isForThrift()
+ {
+ return false;
+ }
+
public boolean hasNext()
{
return false;
}
- public OnDiskAtomIterator next()
+ public UnfilteredRowIterator next()
{
return null;
}
-
- public void close() throws IOException { }
-
- public void remove() { }
}
-
-
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 30b55a0..66b8ac0 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -18,10 +18,7 @@
package org.apache.cassandra.io.sstable.format.big;
import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -29,12 +26,13 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
-import org.apache.cassandra.io.sstable.format.Version;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
@@ -47,7 +45,6 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.FilterFactory;
import org.apache.cassandra.utils.IFilter;
-import org.apache.cassandra.utils.StreamingHistogram;
import org.apache.cassandra.utils.concurrent.Transactional;
import static org.apache.cassandra.utils.Throwables.merge;
@@ -66,9 +63,9 @@ public class BigTableWriter extends SSTableWriter
private DecoratedKey lastWrittenKey;
private FileMark dataMark;
- BigTableWriter(Descriptor descriptor, Long keyCount, Long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector)
+ public BigTableWriter(Descriptor descriptor, Long keyCount, Long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector, SerializationHeader header)
{
- super(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector);
+ super(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector, header);
if (compression)
{
@@ -124,21 +121,38 @@ public class BigTableWriter extends SSTableWriter
}
/**
- * @param row
- * @return null if the row was compacted away entirely; otherwise, the PK index entry for this row
+ * Appends partition data to this writer.
+ *
+ * @param iterator the partition to write
+ * @return the created index entry if something was written, that is if {@code iterator}
+ * wasn't empty, {@code null} otherwise.
+ *
+ * @throws FSWriteError if a write to the dataFile fails
*/
- public RowIndexEntry append(AbstractCompactedRow row)
+ public RowIndexEntry append(UnfilteredRowIterator iterator)
{
- long startPosition = beforeAppend(row.key);
- RowIndexEntry entry;
- try
+ DecoratedKey key = iterator.partitionKey();
+
+ if (key.getKey().remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
+ {
+ logger.error("Key size {} exceeds maximum of {}, skipping row", key.getKey().remaining(), FBUtilities.MAX_UNSIGNED_SHORT);
+ return null;
+ }
+
+ if (iterator.isEmpty())
+ return null;
+
+ long startPosition = beforeAppend(key);
+
+ try (StatsCollector withStats = new StatsCollector(iterator, metadataCollector))
{
- entry = row.write(startPosition, dataFile);
- if (entry == null)
- return null;
+ ColumnIndex index = ColumnIndex.writeAndBuildIndex(withStats, dataFile, header, descriptor.version);
+
+ RowIndexEntry entry = RowIndexEntry.create(startPosition, iterator.partitionLevelDeletion(), index);
+
long endPosition = dataFile.getFilePointer();
- metadataCollector.update(endPosition - startPosition, row.columnStats());
- afterAppend(row.key, endPosition, entry);
+ metadataCollector.addPartitionSizeInBytes(endPosition - startPosition);
+ afterAppend(key, endPosition, entry);
return entry;
}
catch (IOException e)
@@ -147,130 +161,77 @@ public class BigTableWriter extends SSTableWriter
}
}
- public void append(DecoratedKey decoratedKey, ColumnFamily cf)
+ private static class StatsCollector extends WrappingUnfilteredRowIterator
{
- if (decoratedKey.getKey().remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
- {
- logger.error("Key size {} exceeds maximum of {}, skipping row",
- decoratedKey.getKey().remaining(),
- FBUtilities.MAX_UNSIGNED_SHORT);
- return;
- }
+ private int cellCount;
+ private final MetadataCollector collector;
+ private final Set<ColumnDefinition> complexColumnsSetInRow = new HashSet<>();
- long startPosition = beforeAppend(decoratedKey);
- long endPosition;
- try
- {
- RowIndexEntry entry = rawAppend(cf, startPosition, decoratedKey, dataFile.stream);
- endPosition = dataFile.getFilePointer();
- afterAppend(decoratedKey, endPosition, entry);
- }
- catch (IOException e)
+ StatsCollector(UnfilteredRowIterator iter, MetadataCollector collector)
{
- throw new FSWriteError(e, dataFile.getPath());
+ super(iter);
+ this.collector = collector;
+ collector.update(iter.partitionLevelDeletion());
}
- metadataCollector.update(endPosition - startPosition, cf.getColumnStats());
- }
-
- private static RowIndexEntry rawAppend(ColumnFamily cf, long startPosition, DecoratedKey key, DataOutputPlus out) throws IOException
- {
- assert cf.hasColumns() || cf.isMarkedForDelete();
- ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, key.getKey(), out);
- ColumnIndex index = builder.build(cf);
+ @Override
+ public Unfiltered next()
+ {
+ Unfiltered unfiltered = super.next();
+ collector.updateClusteringValues(unfiltered.clustering());
- out.writeShort(END_OF_ROW);
- return RowIndexEntry.create(startPosition, cf.deletionInfo().getTopLevelDeletion(), index);
- }
+ switch (unfiltered.kind())
+ {
+ case ROW:
+ Row row = (Row) unfiltered;
+ collector.update(row.primaryKeyLivenessInfo());
+ collector.update(row.deletion());
- /**
- * @throws IOException if a read from the DataInput fails
- * @throws FSWriteError if a write to the dataFile fails
- */
- public long appendFromStream(DecoratedKey key, CFMetaData metadata, DataInput in, Version version) throws IOException
- {
- long currentPosition = beforeAppend(key);
+ int simpleColumnsSet = 0;
+ complexColumnsSetInRow.clear();
- ColumnStats.MaxLongTracker maxTimestampTracker = new ColumnStats.MaxLongTracker(Long.MAX_VALUE);
- ColumnStats.MinLongTracker minTimestampTracker = new ColumnStats.MinLongTracker(Long.MIN_VALUE);
- ColumnStats.MaxIntTracker maxDeletionTimeTracker = new ColumnStats.MaxIntTracker(Integer.MAX_VALUE);
- List<ByteBuffer> minColumnNames = Collections.emptyList();
- List<ByteBuffer> maxColumnNames = Collections.emptyList();
- StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE);
- boolean hasLegacyCounterShards = false;
+ for (Cell cell : row)
+ {
+ if (cell.column().isComplex())
+ complexColumnsSetInRow.add(cell.column());
+ else
+ ++simpleColumnsSet;
- ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata);
- cf.delete(DeletionTime.serializer.deserialize(in));
+ ++cellCount;
+ collector.update(cell.livenessInfo());
- ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.getKey(), dataFile.stream);
+ if (cell.isCounterCell())
+ collector.updateHasLegacyCounterShards(CounterCells.hasLegacyShards(cell));
+ }
- if (cf.deletionInfo().getTopLevelDeletion().localDeletionTime < Integer.MAX_VALUE)
- {
- tombstones.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
- maxDeletionTimeTracker.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
- minTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
- maxTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
- }
+ for (int i = 0; i < row.columns().complexColumnCount(); i++)
+ collector.update(row.getDeletion(row.columns().getComplex(i)));
- Iterator<RangeTombstone> rangeTombstoneIterator = cf.deletionInfo().rangeIterator();
- while (rangeTombstoneIterator.hasNext())
- {
- RangeTombstone rangeTombstone = rangeTombstoneIterator.next();
- tombstones.update(rangeTombstone.getLocalDeletionTime());
- minTimestampTracker.update(rangeTombstone.timestamp());
- maxTimestampTracker.update(rangeTombstone.timestamp());
- maxDeletionTimeTracker.update(rangeTombstone.getLocalDeletionTime());
- minColumnNames = ColumnNameHelper.minComponents(minColumnNames, rangeTombstone.min, metadata.comparator);
- maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, rangeTombstone.max, metadata.comparator);
- }
+ collector.updateColumnSetPerRow(simpleColumnsSet + complexColumnsSetInRow.size());
- Iterator<OnDiskAtom> iter = AbstractCell.onDiskIterator(in, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, version, metadata.comparator);
- try
- {
- while (iter.hasNext())
- {
- OnDiskAtom atom = iter.next();
- if (atom == null)
break;
-
- if (atom instanceof CounterCell)
- {
- atom = ((CounterCell) atom).markLocalToBeCleared();
- hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) atom).hasLegacyShards();
- }
-
- int deletionTime = atom.getLocalDeletionTime();
- if (deletionTime < Integer.MAX_VALUE)
- tombstones.update(deletionTime);
- minTimestampTracker.update(atom.timestamp());
- maxTimestampTracker.update(atom.timestamp());
- minColumnNames = ColumnNameHelper.minComponents(minColumnNames, atom.name(), metadata.comparator);
- maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, atom.name(), metadata.comparator);
- maxDeletionTimeTracker.update(atom.getLocalDeletionTime());
-
- columnIndexer.add(atom); // This write the atom on disk too
+ case RANGE_TOMBSTONE_MARKER:
+ if (((RangeTombstoneMarker) unfiltered).isBoundary())
+ {
+ RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)unfiltered;
+ collector.update(bm.endDeletionTime());
+ collector.update(bm.startDeletionTime());
+ }
+ else
+ {
+ collector.update(((RangeTombstoneBoundMarker)unfiltered).deletionTime());
+ }
+ break;
}
-
- columnIndexer.maybeWriteEmptyRowHeader();
- dataFile.stream.writeShort(END_OF_ROW);
+ return unfiltered;
}
- catch (IOException e)
+
+ @Override
+ public void close()
{
- throw new FSWriteError(e, dataFile.getPath());
+ collector.addCellPerPartitionCount(cellCount);
+ super.close();
}
-
- metadataCollector.updateMinTimestamp(minTimestampTracker.get())
- .updateMaxTimestamp(maxTimestampTracker.get())
- .updateMaxLocalDeletionTime(maxDeletionTimeTracker.get())
- .addRowSize(dataFile.getFilePointer() - currentPosition)
- .addColumnCount(columnIndexer.writtenAtomCount())
- .mergeTombstoneHistogram(tombstones)
- .updateMinColumnNames(minColumnNames)
- .updateMaxColumnNames(maxColumnNames)
- .updateHasLegacyCounterShards(hasLegacyCounterShards);
-
- afterAppend(key, currentPosition, RowIndexEntry.create(currentPosition, cf.deletionInfo().getTopLevelDeletion(), columnIndexer.build()));
- return currentPosition;
}
private Descriptor makeTmpLinks()
@@ -303,7 +264,7 @@ public class BigTableWriter extends SSTableWriter
components, metadata,
partitioner, ifile,
dfile, iwriter.summary.build(partitioner, boundary),
- iwriter.bf.sharedCopy(), maxDataAge, stats, SSTableReader.OpenReason.EARLY);
+ iwriter.bf.sharedCopy(), maxDataAge, stats, SSTableReader.OpenReason.EARLY, header);
// now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed)
sstable.first = getMinimalKey(first);
@@ -339,7 +300,8 @@ public class BigTableWriter extends SSTableWriter
iwriter.bf.sharedCopy(),
maxDataAge,
stats,
- openReason);
+ openReason,
+ header);
sstable.first = getMinimalKey(first);
sstable.last = getMinimalKey(last);
return sstable;