You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by vi...@apache.org on 2012/12/28 03:46:10 UTC
git commit: Improve removal of gcable tomstones during minor
compaction patch by Vijay; reviewed by Yuki Morishita for CASSANDRA-4671
Updated Branches:
refs/heads/cassandra-1.2 6b093b4ea -> ac9f4786e
Improve removal of gcable tomstones during minor compaction
patch by Vijay; reviewed by Yuki Morishita for CASSANDRA-4671
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ac9f4786
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ac9f4786
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ac9f4786
Branch: refs/heads/cassandra-1.2
Commit: ac9f4786e4d7b3b1a414838eec465f8553bf43b2
Parents: 6b093b4
Author: Vijay Parthasarathy <vi...@gmail.com>
Authored: Thu Dec 27 18:45:21 2012 -0800
Committer: Vijay Parthasarathy <vi...@gmail.com>
Committed: Thu Dec 27 18:45:21 2012 -0800
----------------------------------------------------------------------
src/java/org/apache/cassandra/db/Column.java | 5 ++
src/java/org/apache/cassandra/db/ColumnFamily.java | 4 +-
.../org/apache/cassandra/db/ColumnFamilyStore.java | 1 +
src/java/org/apache/cassandra/db/DeletionInfo.java | 10 ++++
src/java/org/apache/cassandra/db/OnDiskAtom.java | 3 +-
.../org/apache/cassandra/db/RangeTombstone.java | 5 ++
src/java/org/apache/cassandra/db/SuperColumn.java | 8 +++
.../db/compaction/CompactionController.java | 4 +-
.../cassandra/db/compaction/CompactionManager.java | 2 +-
.../db/compaction/LazilyCompactedRow.java | 10 +++-
.../cassandra/db/compaction/PrecompactedRow.java | 4 +-
.../apache/cassandra/db/compaction/Scrubber.java | 2 +-
.../apache/cassandra/io/sstable/ColumnStats.java | 4 +-
.../apache/cassandra/io/sstable/Descriptor.java | 5 ++-
.../cassandra/io/sstable/SSTableMetadata.java | 21 +++++++-
.../apache/cassandra/io/sstable/SSTableReader.java | 5 ++
.../apache/cassandra/io/sstable/SSTableWriter.java | 3 +
.../db/compaction/CompactionsPurgeTest.java | 39 ++++++++++++++-
.../cassandra/io/sstable/DescriptorTest.java | 1 +
.../io/sstable/SSTableMetadataSerializerTest.java | 4 ++
.../apache/cassandra/streaming/BootstrapTest.java | 2 +-
21 files changed, 125 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/db/Column.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Column.java b/src/java/org/apache/cassandra/db/Column.java
index 23eaa7e..616f3c0 100644
--- a/src/java/org/apache/cassandra/db/Column.java
+++ b/src/java/org/apache/cassandra/db/Column.java
@@ -102,6 +102,11 @@ public class Column implements IColumn
return timestamp;
}
+ public long minTimestamp()
+ {
+ return timestamp;
+ }
+
public long maxTimestamp()
{
return timestamp;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index c2dd118..6f19fc4 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -380,16 +380,18 @@ public class ColumnFamily extends AbstractColumnContainer implements IRowCacheEn
public ColumnStats getColumnStats()
{
+ long minTimestampSeen = deletionInfo() == DeletionInfo.LIVE ? Long.MAX_VALUE : deletionInfo().minTimestamp();
long maxTimestampSeen = deletionInfo().maxTimestamp();
StreamingHistogram tombstones = new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE);
for (IColumn column : columns)
{
+ minTimestampSeen = Math.min(minTimestampSeen, column.minTimestamp());
maxTimestampSeen = Math.max(maxTimestampSeen, column.maxTimestamp());
int deletionTime = column.getLocalDeletionTime();
if (deletionTime < Integer.MAX_VALUE)
tombstones.update(deletionTime);
}
- return new ColumnStats(getColumnCount(), maxTimestampSeen, tombstones);
+ return new ColumnStats(getColumnCount(), minTimestampSeen, maxTimestampSeen, tombstones);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 7b8a149..44585ce 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1947,6 +1947,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// and adds generation of live ancestors
for (SSTableReader sstable : sstables)
{
+ sstableMetadataCollector.updateMinTimestamp(sstable.getMinTimestamp());
sstableMetadataCollector.updateMaxTimestamp(sstable.getMaxTimestamp());
sstableMetadataCollector.addAncestor(sstable.descriptor.generation);
for (Integer i : sstable.getAncestors())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java
index b63686f..be64224 100644
--- a/src/java/org/apache/cassandra/db/DeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/DeletionInfo.java
@@ -178,6 +178,16 @@ public class DeletionInfo
}
}
+ public long minTimestamp()
+ {
+ long minTimestamp = topLevel.markedForDeleteAt;
+ for (RangeTombstone i : ranges)
+ {
+ minTimestamp = Math.min(minTimestamp, i.data.markedForDeleteAt);
+ }
+ return minTimestamp;
+ }
+
/**
* The maximum timestamp mentioned by this DeletionInfo.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/db/OnDiskAtom.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/OnDiskAtom.java b/src/java/org/apache/cassandra/db/OnDiskAtom.java
index f504ce8..7501d83 100644
--- a/src/java/org/apache/cassandra/db/OnDiskAtom.java
+++ b/src/java/org/apache/cassandra/db/OnDiskAtom.java
@@ -34,8 +34,9 @@ public interface OnDiskAtom
/**
* For a standard column, this is the same as timestamp().
- * For a super column, this is the max column timestamp of the sub columns.
+ * For a super column, this is the min/max column timestamp of the sub columns.
*/
+ public long minTimestamp();
public long maxTimestamp();
public int getLocalDeletionTime(); // for tombstone GC, so int is sufficient granularity
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/db/RangeTombstone.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java
index 1b85c59..1d472c3 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstone.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstone.java
@@ -57,6 +57,11 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement
return data.localDeletionTime;
}
+ public long minTimestamp()
+ {
+ return data.markedForDeleteAt;
+ }
+
public long maxTimestamp()
{
return data.markedForDeleteAt;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/db/SuperColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SuperColumn.java b/src/java/org/apache/cassandra/db/SuperColumn.java
index f95dc8f..57e87c4 100644
--- a/src/java/org/apache/cassandra/db/SuperColumn.java
+++ b/src/java/org/apache/cassandra/db/SuperColumn.java
@@ -154,6 +154,14 @@ public class SuperColumn extends AbstractColumnContainer implements IColumn
throw new UnsupportedOperationException("This operation is not supported for Super Columns.");
}
+ public long minTimestamp()
+ {
+ long minTimestamp = getMarkedForDeleteAt();
+ for (IColumn subColumn : getSubColumns())
+ minTimestamp = Math.min(minTimestamp, subColumn.minTimestamp());
+ return minTimestamp;
+ }
+
public long maxTimestamp()
{
long maxTimestamp = getMarkedForDeleteAt();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 2c8ddba..6104486 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -109,12 +109,12 @@ public class CompactionController
* @return true if it's okay to drop tombstones for the given row, i.e., if we know all the verisons of the row
* are included in the compaction set
*/
- public boolean shouldPurge(DecoratedKey key)
+ public boolean shouldPurge(DecoratedKey key, long maxDeletionTimestamp)
{
List<SSTableReader> filteredSSTables = overlappingTree.search(key);
for (SSTableReader sstable : filteredSSTables)
{
- if (sstable.getBloomFilter().isPresent(key.key))
+ if (sstable.getBloomFilter().isPresent(key.key) && sstable.getMinTimestamp() >= maxDeletionTimestamp)
return false;
}
return true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 36fa00f..0a31888 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -929,7 +929,7 @@ public class CompactionManager implements CompactionManagerMBean
}
@Override
- public boolean shouldPurge(DecoratedKey key)
+ public boolean shouldPurge(DecoratedKey key, long delTimestamp)
{
/*
* The main reason we always purge is that including gcable tombstone would mean that the
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 084af98..7981d88 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -71,18 +71,20 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
super(rows.get(0).getKey());
this.rows = rows;
this.controller = controller;
- this.shouldPurge = controller.shouldPurge(key);
indexer = controller.cfs.indexManager.updaterFor(key, false);
+ long maxDelTimestamp = Long.MIN_VALUE;
for (OnDiskAtomIterator row : rows)
{
ColumnFamily cf = row.getColumnFamily();
+ maxDelTimestamp = Math.max(maxDelTimestamp, cf.deletionInfo().maxTimestamp());
if (emptyColumnFamily == null)
emptyColumnFamily = cf;
else
emptyColumnFamily.delete(cf);
}
+ this.shouldPurge = controller.shouldPurge(key, maxDelTimestamp);
try
{
@@ -94,7 +96,9 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
}
// reach into the reducer used during iteration to get column count, size, max column timestamp
// (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null)
- columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns, reducer == null ? Long.MIN_VALUE : reducer.maxTimestampSeen,
+ columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns,
+ reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen,
+ reducer == null ? Long.MIN_VALUE : reducer.maxTimestampSeen,
reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones
);
columnSerializedSize = reducer == null ? 0 : reducer.serializedSize;
@@ -236,6 +240,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
long serializedSize = 4; // int for column count
int columns = 0;
+ long minTimestampSeen = Long.MAX_VALUE;
long maxTimestampSeen = Long.MIN_VALUE;
StreamingHistogram tombstones = new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE);
@@ -290,6 +295,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
serializedSize += reduced.serializedSizeForSSTable();
columns++;
+ minTimestampSeen = Math.min(minTimestampSeen, reduced.minTimestamp());
maxTimestampSeen = Math.max(maxTimestampSeen, reduced.maxTimestamp());
int deletionTime = reduced.getLocalDeletionTime();
if (deletionTime < Integer.MAX_VALUE)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
index 2d7f55a..be4b20e 100644
--- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
@@ -59,7 +59,7 @@ public class PrecompactedRow extends AbstractCompactedRow
Boolean shouldPurge = null;
if (cf.hasIrrelevantData(controller.gcBefore))
- shouldPurge = controller.shouldPurge(key);
+ shouldPurge = controller.shouldPurge(key, cf.maxTimestamp());
// We should only gc tombstone if shouldPurge == true. But otherwise,
// it is still ok to collect column that shadowed by their (deleted)
@@ -69,7 +69,7 @@ public class PrecompactedRow extends AbstractCompactedRow
if (compacted != null && compacted.metadata().getDefaultValidator().isCommutative())
{
if (shouldPurge == null)
- shouldPurge = controller.shouldPurge(key);
+ shouldPurge = controller.shouldPurge(key, cf.deletionInfo().maxTimestamp());
if (shouldPurge)
CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index c6855e8..0601857 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -353,7 +353,7 @@ public class Scrubber implements Closeable
}
@Override
- public boolean shouldPurge(DecoratedKey key)
+ public boolean shouldPurge(DecoratedKey key, long delTimestamp)
{
return false;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/io/sstable/ColumnStats.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/ColumnStats.java b/src/java/org/apache/cassandra/io/sstable/ColumnStats.java
index a7dcfec..12ef534 100644
--- a/src/java/org/apache/cassandra/io/sstable/ColumnStats.java
+++ b/src/java/org/apache/cassandra/io/sstable/ColumnStats.java
@@ -28,13 +28,15 @@ public class ColumnStats
public final int columnCount;
/** the largest (client-supplied) timestamp in the row */
+ public final long minTimestamp;
public final long maxTimestamp;
/** histogram of tombstone drop time */
public final StreamingHistogram tombstoneHistogram;
- public ColumnStats(int columnCount, long maxTimestamp, StreamingHistogram tombstoneHistogram)
+ public ColumnStats(int columnCount, long minTimestamp, long maxTimestamp, StreamingHistogram tombstoneHistogram)
{
+ this.minTimestamp = minTimestamp;
this.maxTimestamp = maxTimestamp;
this.columnCount = columnCount;
this.tombstoneHistogram = tombstoneHistogram;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index 18c8a41..cf1907e 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -47,7 +47,7 @@ public class Descriptor
public static class Version
{
// This needs to be at the begining for initialization sake
- private static final String current_version = "ia";
+ private static final String current_version = "ib";
public static final Version LEGACY = new Version("a"); // "pre-history"
// b (0.7.0): added version to sstable filenames
@@ -65,6 +65,7 @@ public class Descriptor
// ia (1.2.0): column indexes are promoted to the index file
// records estimated histogram of deletion times in tombstones
// bloom filter (keys and columns) upgraded to Murmur3
+ // ib (1.2.1): tracks min client timestamp in metadata component
public static final Version CURRENT = new Version(current_version);
@@ -77,6 +78,7 @@ public class Descriptor
public final boolean metadataIncludesReplayPosition;
public final boolean metadataIncludesModernReplayPosition;
public final boolean tracksMaxTimestamp;
+ public final boolean tracksMinTimestamp;
public final boolean hasCompressionRatio;
public final boolean hasPartitioner;
public final boolean tracksTombstones;
@@ -94,6 +96,7 @@ public class Descriptor
hasCompressionRatio = version.compareTo("hb") >= 0;
hasPartitioner = version.compareTo("hc") >= 0;
tracksMaxTimestamp = version.compareTo("hd") >= 0;
+ tracksMinTimestamp = version.compareTo("ib") >= 0;
hasAncestors = version.compareTo("he") >= 0;
metadataIncludesModernReplayPosition = version.compareTo("hf") >= 0;
tracksTombstones = version.compareTo("ia") >= 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java b/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
index 7932c88..e53fdf9 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
@@ -51,6 +51,7 @@ public class SSTableMetadata
public final EstimatedHistogram estimatedRowSize;
public final EstimatedHistogram estimatedColumnCount;
public final ReplayPosition replayPosition;
+ public final long minTimestamp;
public final long maxTimestamp;
public final double compressionRatio;
public final String partitioner;
@@ -62,6 +63,7 @@ public class SSTableMetadata
this(defaultRowSizeHistogram(),
defaultColumnCountHistogram(),
ReplayPosition.NONE,
+ Long.MAX_VALUE,
Long.MIN_VALUE,
NO_COMPRESSION_RATIO,
null,
@@ -69,12 +71,13 @@ public class SSTableMetadata
defaultTombstoneDropTimeHistogram());
}
- private SSTableMetadata(EstimatedHistogram rowSizes, EstimatedHistogram columnCounts, ReplayPosition replayPosition, long maxTimestamp,
- double cr, String partitioner, Set<Integer> ancestors, StreamingHistogram estimatedTombstoneDropTime)
+ private SSTableMetadata(EstimatedHistogram rowSizes, EstimatedHistogram columnCounts, ReplayPosition replayPosition, long minTimestamp,
+ long maxTimestamp, double cr, String partitioner, Set<Integer> ancestors, StreamingHistogram estimatedTombstoneDropTime)
{
this.estimatedRowSize = rowSizes;
this.estimatedColumnCount = columnCounts;
this.replayPosition = replayPosition;
+ this.minTimestamp = minTimestamp;
this.maxTimestamp = maxTimestamp;
this.compressionRatio = cr;
this.partitioner = partitioner;
@@ -129,6 +132,7 @@ public class SSTableMetadata
protected EstimatedHistogram estimatedRowSize = defaultRowSizeHistogram();
protected EstimatedHistogram estimatedColumnCount = defaultColumnCountHistogram();
protected ReplayPosition replayPosition = ReplayPosition.NONE;
+ protected long minTimestamp = Long.MAX_VALUE;
protected long maxTimestamp = Long.MIN_VALUE;
protected double compressionRatio = NO_COMPRESSION_RATIO;
protected Set<Integer> ancestors = new HashSet<Integer>();
@@ -158,6 +162,11 @@ public class SSTableMetadata
compressionRatio = (double) compressed/uncompressed;
}
+ public void updateMinTimestamp(long potentialMin)
+ {
+ minTimestamp = Math.min(minTimestamp, potentialMin);
+ }
+
public void updateMaxTimestamp(long potentialMax)
{
maxTimestamp = Math.max(maxTimestamp, potentialMax);
@@ -168,6 +177,7 @@ public class SSTableMetadata
return new SSTableMetadata(estimatedRowSize,
estimatedColumnCount,
replayPosition,
+ minTimestamp,
maxTimestamp,
compressionRatio,
partitioner,
@@ -201,6 +211,7 @@ public class SSTableMetadata
void update(long size, ColumnStats stats)
{
+ updateMinTimestamp(stats.minTimestamp);
/*
* The max timestamp is not always collected here (more precisely, row.maxTimestamp() may return Long.MIN_VALUE),
* to avoid deserializing an EchoedRow.
@@ -226,6 +237,7 @@ public class SSTableMetadata
EstimatedHistogram.serializer.serialize(sstableStats.estimatedRowSize, dos);
EstimatedHistogram.serializer.serialize(sstableStats.estimatedColumnCount, dos);
ReplayPosition.serializer.serialize(sstableStats.replayPosition, dos);
+ dos.writeLong(sstableStats.minTimestamp);
dos.writeLong(sstableStats.maxTimestamp);
dos.writeDouble(sstableStats.compressionRatio);
dos.writeUTF(sstableStats.partitioner);
@@ -269,6 +281,9 @@ public class SSTableMetadata
// make sure we don't omit replaying something that we should. see CASSANDRA-4782
replayPosition = ReplayPosition.NONE;
}
+ long minTimestamp = desc.version.tracksMinTimestamp ? dis.readLong() : Long.MIN_VALUE;
+ if (!desc.version.tracksMinTimestamp)
+ minTimestamp = Long.MAX_VALUE;
long maxTimestamp = desc.version.containsTimestamp() ? dis.readLong() : Long.MIN_VALUE;
if (!desc.version.tracksMaxTimestamp) // see javadoc to Descriptor.containsTimestamp
maxTimestamp = Long.MIN_VALUE;
@@ -283,7 +298,7 @@ public class SSTableMetadata
StreamingHistogram tombstoneHistogram = desc.version.tracksTombstones
? StreamingHistogram.serializer.deserialize(dis)
: defaultTombstoneDropTimeHistogram();
- return new SSTableMetadata(rowSizes, columnCounts, replayPosition, maxTimestamp, compressionRatio, partitioner, ancestors, tombstoneHistogram);
+ return new SSTableMetadata(rowSizes, columnCounts, replayPosition, minTimestamp, maxTimestamp, compressionRatio, partitioner, ancestors, tombstoneHistogram);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 6099ca8..42b4ae6 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -1096,6 +1096,11 @@ public class SSTableReader extends SSTable
return sstableMetadata.replayPosition;
}
+ public long getMinTimestamp()
+ {
+ return sstableMetadata.minTimestamp;
+ }
+
public long getMaxTimestamp()
{
return sstableMetadata.maxTimestamp;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 3e4656a..a3a8fe9 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -234,6 +234,7 @@ public class SSTableWriter extends SSTable
}
// deserialize each column to obtain maxTimestamp and immediately serialize it.
+ long minTimestamp = Long.MAX_VALUE;
long maxTimestamp = Long.MIN_VALUE;
StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE);
ColumnFamily cf = ColumnFamily.create(metadata, ArrayBackedSortedColumns.factory());
@@ -268,6 +269,7 @@ public class SSTableWriter extends SSTable
{
tombstones.update(deletionTime);
}
+ minTimestamp = Math.min(minTimestamp, atom.minTimestamp());
maxTimestamp = Math.max(maxTimestamp, atom.maxTimestamp());
try
{
@@ -281,6 +283,7 @@ public class SSTableWriter extends SSTable
assert dataSize == dataFile.getFilePointer() - (dataStart + 8)
: "incorrect row data size " + dataSize + " written to " + dataFile.getPath() + "; correct is " + (dataFile.getFilePointer() - (dataStart + 8));
+ sstableMetadataCollector.updateMinTimestamp(minTimestamp);
sstableMetadataCollector.updateMaxTimestamp(maxTimestamp);
sstableMetadataCollector.addRowSize(dataFile.getFilePointer() - currentPosition);
sstableMetadataCollector.addColumnCount(columnCount);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index e8ed58c..deac172 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@ -23,6 +23,8 @@ import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.concurrent.ExecutionException;
+import junit.framework.Assert;
+
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
@@ -140,7 +142,7 @@ public class CompactionsPurgeTest extends SchemaLoader
// verify that minor compaction does not GC when key is present
// in a non-compacted sstable
ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key1, new QueryPath(cfName)));
- assert cf.getColumnCount() == 10;
+ Assert.assertEquals(10, cf.getColumnCount());
// verify that minor compaction does GC when key is provably not
// present in a non-compacted sstable
@@ -149,6 +151,41 @@ public class CompactionsPurgeTest extends SchemaLoader
}
@Test
+ public void testMinTimestampPurge() throws IOException, ExecutionException, InterruptedException
+ {
+ CompactionManager.instance.disableAutoCompaction();
+ Table table = Table.open(TABLE2);
+ String cfName = "Standard1";
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+
+ RowMutation rm;
+ DecoratedKey key3 = Util.dk("key3");
+ // inserts
+ rm = new RowMutation(TABLE2, key3.key);
+ rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes("c1")), ByteBufferUtil.EMPTY_BYTE_BUFFER, 8);
+ rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes("c2")), ByteBufferUtil.EMPTY_BYTE_BUFFER, 8);
+ rm.apply();
+ cfs.forceBlockingFlush();
+ // deletes
+ rm = new RowMutation(TABLE2, key3.key);
+ rm.delete(new QueryPath(cfName, null, ByteBufferUtil.bytes("c1")), 10);
+ rm.apply();
+ cfs.forceBlockingFlush();
+ Collection<SSTableReader> sstablesIncomplete = cfs.getSSTables();
+
+ // delete so we have new delete in a diffrent SST.
+ rm = new RowMutation(TABLE2, key3.key);
+ rm.delete(new QueryPath(cfName, null, ByteBufferUtil.bytes("c2")), 9);
+ rm.apply();
+ cfs.forceBlockingFlush();
+ new CompactionTask(cfs, sstablesIncomplete, Integer.MAX_VALUE).execute(null);
+
+ ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key3, new QueryPath(cfName)));
+ Assert.assertTrue(!cf.getColumn(ByteBufferUtil.bytes("c2")).isLive());
+ Assert.assertEquals(1, cf.getColumnCount());
+ }
+
+ @Test
public void testCompactionPurgeOneFile() throws IOException, ExecutionException, InterruptedException
{
CompactionManager.instance.disableAutoCompaction();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java b/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
index 545a9ec..007e0ca 100644
--- a/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
@@ -50,6 +50,7 @@ public class DescriptorTest
desc = Descriptor.fromFilename("Keyspace1-Standard1-gz-1-Data.db");
assert "gz".equals(desc.version.toString());
assert !desc.version.tracksMaxTimestamp;
+ assert !desc.version.tracksMinTimestamp;
}
@Test
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java
index 562be07..ab96f89 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java
@@ -43,12 +43,14 @@ public class SSTableMetadataSerializerTest
new long[] { 6L, 7L },
new long[] { 8L, 9L, 10L });
ReplayPosition rp = new ReplayPosition(11L, 12);
+ long minTimestamp = 2162517136L;
long maxTimestamp = 4162517136L;
SSTableMetadata.Collector collector = SSTableMetadata.createCollector()
.estimatedRowSize(rowSizes)
.estimatedColumnCount(columnCounts)
.replayPosition(rp);
+ collector.updateMinTimestamp(minTimestamp);
collector.updateMaxTimestamp(maxTimestamp);
SSTableMetadata originalMetadata = collector.finalizeMetadata(RandomPartitioner.class.getCanonicalName());
@@ -68,7 +70,9 @@ public class SSTableMetadataSerializerTest
assert stats.estimatedColumnCount.equals(columnCounts);
assert stats.replayPosition.equals(originalMetadata.replayPosition);
assert stats.replayPosition.equals(rp);
+ assert stats.minTimestamp == minTimestamp;
assert stats.maxTimestamp == maxTimestamp;
+ assert stats.minTimestamp == originalMetadata.minTimestamp;
assert stats.maxTimestamp == originalMetadata.maxTimestamp;
assert RandomPartitioner.class.getCanonicalName().equals(stats.partitioner);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9f4786/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/BootstrapTest.java b/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
index 83e0470..1af3074 100644
--- a/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
+++ b/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
@@ -36,7 +36,7 @@ public class BootstrapTest extends SchemaLoader
@Test
public void testGetNewNames() throws IOException
{
- Descriptor desc = Descriptor.fromFilename(new File("Keyspace1", "Keyspace1-Standard1-ia-500-Data.db").toString());
+ Descriptor desc = Descriptor.fromFilename(new File("Keyspace1", "Keyspace1-Standard1-ib-500-Data.db").toString());
// assert !desc.isLatestVersion; // minimum compatible version -- for now it is the latest as well
PendingFile inContext = new PendingFile(null, desc, "Data.db", Arrays.asList(Pair.create(0L, 1L)), OperationType.BOOTSTRAP);