You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2013/11/15 22:26:16 UTC
git commit: SSTable/SSTableReader cleanup
Updated Branches:
refs/heads/trunk fab27bd59 -> f388c9d69
SSTable/SSTableReader cleanup
patch by yukim; reviewed by jbellis for CASSANDRA-6355
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f388c9d6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f388c9d6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f388c9d6
Branch: refs/heads/trunk
Commit: f388c9d69b855f0c3b146864717a971034fd3dc5
Parents: fab27bd
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri Nov 15 15:23:55 2013 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Nov 15 15:23:55 2013 -0600
----------------------------------------------------------------------
.../cassandra/db/CollationController.java | 5 +-
.../apache/cassandra/db/ColumnFamilyStore.java | 4 +-
.../db/compaction/CompactionController.java | 3 +-
.../cassandra/db/compaction/CompactionTask.java | 6 +-
.../compaction/LeveledCompactionStrategy.java | 3 +-
.../db/compaction/LeveledManifest.java | 14 ++--
.../cassandra/db/compaction/Upgrader.java | 9 +--
.../apache/cassandra/io/sstable/Component.java | 6 --
.../cassandra/io/sstable/KeyIterator.java | 2 +-
.../apache/cassandra/io/sstable/SSTable.java | 55 ++-------------
.../cassandra/io/sstable/SSTableMetadata.java | 2 +-
.../cassandra/io/sstable/SSTableReader.java | 73 ++++++++++++++------
.../cassandra/io/sstable/SSTableWriter.java | 12 ++--
.../io/util/DataIntegrityMetadata.java | 4 +-
.../LongLeveledCompactionStrategyTest.java | 3 +-
.../cassandra/db/ColumnFamilyStoreTest.java | 2 +-
.../LeveledCompactionStrategyTest.java | 5 +-
.../cassandra/io/sstable/SSTableReaderTest.java | 2 +-
18 files changed, 88 insertions(+), 122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java
index 758d523..9896fde 100644
--- a/src/java/org/apache/cassandra/db/CollationController.java
+++ b/src/java/org/apache/cassandra/db/CollationController.java
@@ -27,7 +27,6 @@ import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
import org.apache.cassandra.db.filter.NamesQueryFilter;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.marshal.CounterColumnType;
-import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.tracing.Tracing;
@@ -99,7 +98,7 @@ public class CollationController
QueryFilter reducedFilter = new QueryFilter(filter.key, filter.cfName, namesFilter.withUpdatedColumns(filterColumns), filter.timestamp);
/* add the SSTables on disk */
- Collections.sort(view.sstables, SSTable.maxTimestampComparator);
+ Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
// read sorted sstables
long mostRecentRowTombstone = Long.MIN_VALUE;
@@ -219,7 +218,7 @@ public class CollationController
* In othere words, iterating in maxTimestamp order allow to do our mostRecentTombstone elimination
* in one pass, and minimize the number of sstables for which we read a rowTombstone.
*/
- Collections.sort(view.sstables, SSTable.maxTimestampComparator);
+ Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
List<SSTableReader> skippedSSTables = null;
long mostRecentRowTombstone = Long.MIN_VALUE;
long minTimestamp = Long.MAX_VALUE;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 73704d4..1b8a1bf 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -422,7 +422,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
Descriptor desc = sstableFiles.getKey();
Set<Component> components = sstableFiles.getValue();
- if (components.contains(Component.COMPACTED_MARKER) || desc.temporary)
+ if (desc.temporary)
{
SSTable.delete(desc, components);
continue;
@@ -1010,7 +1010,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
if (operation != OperationType.CLEANUP || isIndex())
{
- return SSTable.getTotalBytes(sstables);
+ return SSTableReader.getTotalBytes(sstables);
}
// cleanup size estimation only counts bytes for keys local to this node
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 201cd0a..dc7730c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -30,7 +30,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DataTracker;
import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.utils.AlwaysPresentFilter;
@@ -118,7 +117,7 @@ public class CompactionController
// we still need to keep candidates that might shadow something in a
// non-candidate sstable. And if we remove a sstable from the candidates, we
// must take it's timestamp into account (hence the sorting below).
- Collections.sort(candidates, SSTable.maxTimestampComparator);
+ Collections.sort(candidates, SSTableReader.maxTimestampComparator);
Iterator<SSTableReader> iterator = candidates.iterator();
while (iterator.hasNext())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index bc419ad..f4cc500 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -118,7 +118,7 @@ public class CompactionTask extends AbstractCompactionTask
long totalkeysWritten = 0;
long estimatedTotalKeys = Math.max(cfs.metadata.getIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact, cfs.metadata));
- long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableSize());
+ long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableSize());
long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
if (logger.isDebugEnabled())
logger.debug("Expected bloom filter size : {}", keysPerSSTable);
@@ -244,8 +244,8 @@ public class CompactionTask extends AbstractCompactionTask
// log a bunch of statistics about the result and save to system table compaction_history
long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
- long startsize = SSTable.getTotalBytes(toCompact);
- long endsize = SSTable.getTotalBytes(sstables);
+ long startsize = SSTableReader.getTotalBytes(toCompact);
+ long endsize = SSTableReader.getTotalBytes(sstables);
double ratio = (double) endsize / (double) startsize;
StringBuilder builder = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index e992003..3eb5e91 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -32,7 +32,6 @@ import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.notifications.INotification;
import org.apache.cassandra.notifications.INotificationConsumer;
@@ -234,7 +233,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
}
totalLength = length;
- Collections.sort(this.sstables, SSTable.sstableComparator);
+ Collections.sort(this.sstables, SSTableReader.sstableComparator);
sstableIterator = this.sstables.iterator();
assert sstableIterator.hasNext(); // caller should check intersecting first
currentScanner = sstableIterator.next().getScanner(range, CompactionManager.instance.getRateLimiter());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 63ad2e4..2b79493 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -151,13 +151,13 @@ public class LeveledManifest
minLevel = Math.min(minLevel, ssTableReader.getSSTableLevel());
add(ssTableReader);
}
- lastCompactedKeys[minLevel] = SSTable.sstableOrdering.max(added).last;
+ lastCompactedKeys[minLevel] = SSTableReader.sstableOrdering.max(added).last;
}
public synchronized void repairOverlappingSSTables(int level)
{
SSTableReader previous = null;
- Collections.sort(generations[level], SSTable.sstableComparator);
+ Collections.sort(generations[level], SSTableReader.sstableComparator);
List<SSTableReader> outOfOrderSSTables = new ArrayList<SSTableReader>();
for (SSTableReader current : generations[level])
{
@@ -264,7 +264,7 @@ public class LeveledManifest
// we want to calculate score excluding compacting ones
Set<SSTableReader> sstablesInLevel = Sets.newHashSet(sstables);
Set<SSTableReader> remaining = Sets.difference(sstablesInLevel, cfs.getDataTracker().getCompacting());
- double score = (double)SSTableReader.getTotalBytes(remaining) / (double)maxBytesForLevel(i);
+ double score = (double) SSTableReader.getTotalBytes(remaining) / (double)maxBytesForLevel(i);
logger.debug("Compaction score for level {} is {}", i, score);
if (score > 1.001)
@@ -454,7 +454,7 @@ public class LeveledManifest
}
// leave everything in L0 if we didn't end up with a full sstable's worth of data
- if (SSTable.getTotalBytes(candidates) > maxSSTableSizeInBytes)
+ if (SSTableReader.getTotalBytes(candidates) > maxSSTableSizeInBytes)
{
// add sstables from L1 that overlap candidates
// if the overlapping ones are already busy in a compaction, leave it out.
@@ -468,7 +468,7 @@ public class LeveledManifest
}
// for non-L0 compactions, pick up where we left off last time
- Collections.sort(generations[level], SSTable.sstableComparator);
+ Collections.sort(generations[level], SSTableReader.sstableComparator);
int start = 0; // handles case where the prior compaction touched the very last range
for (int i = 0; i < generations[level].size(); i++)
{
@@ -499,7 +499,7 @@ public class LeveledManifest
private List<SSTableReader> ageSortedSSTables(Collection<SSTableReader> candidates)
{
List<SSTableReader> ageSortedCandidates = new ArrayList<SSTableReader>(candidates);
- Collections.sort(ageSortedCandidates, SSTable.maxTimestampComparator);
+ Collections.sort(ageSortedCandidates, SSTableReader.maxTimestampComparator);
return ageSortedCandidates;
}
@@ -557,7 +557,7 @@ public class LeveledManifest
}
int newLevel;
- if (minimumLevel == 0 && minimumLevel == maximumLevel && SSTable.getTotalBytes(sstables) < maxSSTableSizeInBytes)
+ if (minimumLevel == 0 && minimumLevel == maximumLevel && SSTableReader.getTotalBytes(sstables) < maxSSTableSizeInBytes)
{
newLevel = 0;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index b98c2ae..383ff00 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -25,13 +25,6 @@ import com.google.common.base.Throwables;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
-import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
-import org.apache.cassandra.db.compaction.AbstractCompactionIterable;
-import org.apache.cassandra.db.compaction.CompactionIterable;
-import org.apache.cassandra.db.compaction.CompactionController;
-import org.apache.cassandra.db.compaction.CompactionTask;
-import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.OutputHandler;
@@ -63,7 +56,7 @@ public class Upgrader
this.strategy = cfs.getCompactionStrategy();
long estimatedTotalKeys = Math.max(cfs.metadata.getIndexInterval(), SSTableReader.getApproximateKeyCount(toUpgrade, cfs.metadata));
- long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(this.toUpgrade) / strategy.getMaxSSTableSize());
+ long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(this.toUpgrade) / strategy.getMaxSSTableSize());
this.estimatedRows = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/io/sstable/Component.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java b/src/java/org/apache/cassandra/io/sstable/Component.java
index 599e0ba..4635251 100644
--- a/src/java/org/apache/cassandra/io/sstable/Component.java
+++ b/src/java/org/apache/cassandra/io/sstable/Component.java
@@ -43,10 +43,6 @@ public class Component
PRIMARY_INDEX("Index.db"),
// serialized bloom filter for the row keys in the sstable
FILTER("Filter.db"),
- // 0-length file that is created when an sstable is ready to be deleted
- // @deprecated: deletion of compacted file is based on the lineag information stored in the compacted sstabl
- // metadata. This ensure we can guarantee never using a sstable and some of its parents, even in case of failure.
- COMPACTED_MARKER("Compacted"),
// file to hold information about uncompressed data length, chunk offsets etc.
COMPRESSION_INFO("CompressionInfo.db"),
// statistical metadata about the content of the sstable
@@ -81,7 +77,6 @@ public class Component
public final static Component DATA = new Component(Type.DATA);
public final static Component PRIMARY_INDEX = new Component(Type.PRIMARY_INDEX);
public final static Component FILTER = new Component(Type.FILTER);
- public final static Component COMPACTED_MARKER = new Component(Type.COMPACTED_MARKER);
public final static Component COMPRESSION_INFO = new Component(Type.COMPRESSION_INFO);
public final static Component STATS = new Component(Type.STATS);
public final static Component DIGEST = new Component(Type.DIGEST);
@@ -133,7 +128,6 @@ public class Component
case DATA: component = Component.DATA; break;
case PRIMARY_INDEX: component = Component.PRIMARY_INDEX; break;
case FILTER: component = Component.FILTER; break;
- case COMPACTED_MARKER: component = Component.COMPACTED_MARKER; break;
case COMPRESSION_INFO: component = Component.COMPRESSION_INFO; break;
case STATS: component = Component.STATS; break;
case DIGEST: component = Component.DIGEST; break;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
index f4f7ee5..0c36f62 100644
--- a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
@@ -35,7 +35,7 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close
public KeyIterator(Descriptor desc)
{
- File path = new File(desc.filenameFor(SSTable.COMPONENT_INDEX));
+ File path = new File(desc.filenameFor(Component.PRIMARY_INDEX));
in = RandomAccessReader.open(path);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index e469b33..c13c423 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -24,7 +24,6 @@ import java.util.concurrent.CopyOnWriteArraySet;
import com.google.common.base.Predicates;
import com.google.common.collect.Collections2;
-import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
import org.slf4j.Logger;
@@ -57,27 +56,10 @@ public abstract class SSTable
{
static final Logger logger = LoggerFactory.getLogger(SSTable.class);
- // TODO: replace with 'Component' objects
- public static final String COMPONENT_DATA = Component.Type.DATA.repr;
- public static final String COMPONENT_INDEX = Component.Type.PRIMARY_INDEX.repr;
- public static final String COMPONENT_FILTER = Component.Type.FILTER.repr;
- public static final String COMPONENT_STATS = Component.Type.STATS.repr;
- public static final String COMPONENT_DIGEST = Component.Type.DIGEST.repr;
-
public static final String TEMPFILE_MARKER = "tmp";
public static final int TOMBSTONE_HISTOGRAM_BIN_SIZE = 100;
- public static final Comparator<SSTableReader> maxTimestampComparator = new Comparator<SSTableReader>()
- {
- public int compare(SSTableReader o1, SSTableReader o2)
- {
- long ts1 = o1.getMaxTimestamp();
- long ts2 = o2.getMaxTimestamp();
- return (ts1 > ts2 ? -1 : (ts1 == ts2 ? 0 : 1));
- }
- };
-
public final Descriptor descriptor;
protected final Set<Component> components;
public final CFMetaData metadata;
@@ -101,26 +83,13 @@ public abstract class SSTable
assert partitioner != null;
this.descriptor = descriptor;
- Set<Component> dataComponents = new HashSet<Component>(components);
- for (Component component : components)
- assert component.type != Component.Type.COMPACTED_MARKER;
-
+ Set<Component> dataComponents = new HashSet<>(components);
this.compression = dataComponents.contains(Component.COMPRESSION_INFO);
- this.components = new CopyOnWriteArraySet<Component>(dataComponents);
+ this.components = new CopyOnWriteArraySet<>(dataComponents);
this.metadata = metadata;
this.partitioner = partitioner;
}
- public static final Comparator<SSTableReader> sstableComparator = new Comparator<SSTableReader>()
- {
- public int compare(SSTableReader o1, SSTableReader o2)
- {
- return o1.first.compareTo(o2.first);
- }
- };
-
- public static final Ordering<SSTableReader> sstableOrdering = Ordering.from(sstableComparator);
-
/**
* We use a ReferenceQueue to manage deleting files that have been compacted
* and for which no more SSTable references exist. But this is not guaranteed
@@ -139,15 +108,11 @@ public abstract class SSTable
FileUtils.deleteWithConfirm(desc.filenameFor(Component.DATA));
for (Component component : components)
{
- if (component.equals(Component.DATA) || component.equals(Component.COMPACTED_MARKER) || component.equals(Component.SUMMARY))
+ if (component.equals(Component.DATA) || component.equals(Component.SUMMARY))
continue;
FileUtils.deleteWithConfirm(desc.filenameFor(component));
}
- // remove the COMPACTED_MARKER component last if it exists
- // Note: newly created sstable should not have a marker, but we keep this for now to make sure
- // we don't leave older marker around
- FileUtils.delete(desc.filenameFor(Component.COMPACTED_MARKER));
FileUtils.delete(desc.filenameFor(Component.SUMMARY));
logger.debug("Deleted {}", desc);
@@ -167,12 +132,12 @@ public abstract class SSTable
public String getFilename()
{
- return descriptor.filenameFor(COMPONENT_DATA);
+ return descriptor.filenameFor(Component.DATA);
}
public String getIndexFilename()
{
- return descriptor.filenameFor(COMPONENT_INDEX);
+ return descriptor.filenameFor(Component.PRIMARY_INDEX);
}
public String getColumnFamilyName()
@@ -262,16 +227,6 @@ public abstract class SSTable
return estimatedRows;
}
- public static long getTotalBytes(Iterable<SSTableReader> sstables)
- {
- long sum = 0;
- for (SSTableReader sstable : sstables)
- {
- sum += sstable.onDiskLength();
- }
- return sum;
- }
-
public long bytesOnDisk()
{
long bytes = 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java b/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
index 140e08b..8ddfdd7 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
@@ -435,7 +435,7 @@ public class SSTableMetadata
public Pair<SSTableMetadata, Set<Integer>> deserialize(Descriptor descriptor, boolean loadSSTableLevel) throws IOException
{
logger.debug("Load metadata for {}", descriptor);
- File statsFile = new File(descriptor.filenameFor(SSTable.COMPONENT_STATS));
+ File statsFile = new File(descriptor.filenameFor(Component.STATS));
if (!statsFile.exists())
{
logger.debug("No sstable stats for {}", descriptor);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index c5b61d9..7fd9ca6 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Ordering;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
@@ -67,8 +68,28 @@ public class SSTableReader extends SSTable implements Closeable
private static final ScheduledThreadPoolExecutor syncExecutor = new ScheduledThreadPoolExecutor(1);
private static final RateLimiter meterSyncThrottle = RateLimiter.create(100.0);
+ public static final Comparator<SSTableReader> maxTimestampComparator = new Comparator<SSTableReader>()
+ {
+ public int compare(SSTableReader o1, SSTableReader o2)
+ {
+ long ts1 = o1.getMaxTimestamp();
+ long ts2 = o2.getMaxTimestamp();
+ return (ts1 > ts2 ? -1 : (ts1 == ts2 ? 0 : 1));
+ }
+ };
+
+ public static final Comparator<SSTableReader> sstableComparator = new Comparator<SSTableReader>()
+ {
+ public int compare(SSTableReader o1, SSTableReader o2)
+ {
+ return o1.first.compareTo(o2.first);
+ }
+ };
+
+ public static final Ordering<SSTableReader> sstableOrdering = Ordering.from(sstableComparator);
+
/**
- * maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an uppper bound
+ * maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an upper bound
* to the newest piece of data stored in the sstable. In other words, this sstable does not contain items created
* later than maxDataAge.
*
@@ -168,7 +189,7 @@ public class SSTableReader extends SSTable implements Closeable
SegmentedFile.Builder dbuilder = sstable.compression
? new CompressedSegmentedFile.Builder()
: new BufferedSegmentedFile.Builder();
- if (!loadSummary(sstable, ibuilder, dbuilder, sstable.metadata))
+ if (!sstable.loadSummary(ibuilder, dbuilder, sstable.metadata))
sstable.buildSummary(false, ibuilder, dbuilder, false);
sstable.ifile = ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
sstable.dfile = dbuilder.complete(sstable.descriptor.filenameFor(Component.DATA));
@@ -218,7 +239,7 @@ public class SSTableReader extends SSTable implements Closeable
assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
- logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(COMPONENT_DATA)).length());
+ logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
SSTableMetadata sstableMetadata = SSTableMetadata.serializer.deserialize(descriptor).left;
@@ -247,7 +268,7 @@ public class SSTableReader extends SSTable implements Closeable
final CFMetaData metadata,
final IPartitioner partitioner)
{
- final Collection<SSTableReader> sstables = new LinkedBlockingQueue<SSTableReader>();
+ final Collection<SSTableReader> sstables = new LinkedBlockingQueue<>();
ExecutorService executor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("SSTableBatchOpen", FBUtilities.getAvailableProcessors());
for (final Map.Entry<Descriptor, Set<Component>> entry : entries)
@@ -365,6 +386,16 @@ public class SSTableReader extends SSTable implements Closeable
this.bf = bloomFilter;
}
+ public static long getTotalBytes(Iterable<SSTableReader> sstables)
+ {
+ long sum = 0;
+ for (SSTableReader sstable : sstables)
+ {
+ sum += sstable.onDiskLength();
+ }
+ return sum;
+ }
+
/**
* Clean up all opened resources.
*
@@ -441,18 +472,18 @@ public class SSTableReader extends SSTable implements Closeable
? SegmentedFile.getCompressedBuilder()
: SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
- boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder, metadata);
+ boolean summaryLoaded = loadSummary(ibuilder, dbuilder, metadata);
if (recreateBloomFilter || !summaryLoaded)
buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded);
ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
if (saveSummaryIfCreated && (recreateBloomFilter || !summaryLoaded)) // save summary information to disk
- saveSummary(this, ibuilder, dbuilder);
+ saveSummary(ibuilder, dbuilder);
}
- private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded) throws IOException
- {
+ private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded) throws IOException
+ {
// we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
@@ -505,27 +536,27 @@ public class SSTableReader extends SSTable implements Closeable
last = getMinimalKey(last);
}
- public static boolean loadSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, CFMetaData metadata)
+ public boolean loadSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, CFMetaData metadata)
{
- File summariesFile = new File(reader.descriptor.filenameFor(Component.SUMMARY));
- if (!reader.descriptor.version.offHeapSummaries || !summariesFile.exists())
+ File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
+ if (!descriptor.version.offHeapSummaries || !summariesFile.exists())
return false;
DataInputStream iStream = null;
try
{
iStream = new DataInputStream(new FileInputStream(summariesFile));
- reader.indexSummary = IndexSummary.serializer.deserialize(iStream, reader.partitioner);
- if (reader.indexSummary.getIndexInterval() != metadata.getIndexInterval())
+ indexSummary = IndexSummary.serializer.deserialize(iStream, partitioner);
+ if (indexSummary.getIndexInterval() != metadata.getIndexInterval())
{
iStream.close();
logger.debug("Cannot read the saved summary for {} because Index Interval changed from {} to {}.",
- reader.toString(), reader.indexSummary.getIndexInterval(), metadata.getIndexInterval());
+ toString(), indexSummary.getIndexInterval(), metadata.getIndexInterval());
FileUtils.deleteWithConfirm(summariesFile);
return false;
}
- reader.first = reader.partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
- reader.last = reader.partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
+ first = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
+ last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
ibuilder.deserializeBounds(iStream);
dbuilder.deserializeBounds(iStream);
}
@@ -544,9 +575,9 @@ public class SSTableReader extends SSTable implements Closeable
return true;
}
- public static void saveSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
+ public void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
{
- File summariesFile = new File(reader.descriptor.filenameFor(Component.SUMMARY));
+ File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
if (summariesFile.exists())
summariesFile.delete();
@@ -554,9 +585,9 @@ public class SSTableReader extends SSTable implements Closeable
try
{
oStream = new DataOutputStream(new FileOutputStream(summariesFile));
- IndexSummary.serializer.serialize(reader.indexSummary, oStream);
- ByteBufferUtil.writeWithLength(reader.first.key, oStream);
- ByteBufferUtil.writeWithLength(reader.last.key, oStream);
+ IndexSummary.serializer.serialize(indexSummary, oStream);
+ ByteBufferUtil.writeWithLength(first.key, oStream);
+ ByteBufferUtil.writeWithLength(last.key, oStream);
ibuilder.serializeBounds(oStream);
dbuilder.serializeBounds(oStream);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 70c0b42..b5d50cf 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -313,8 +313,8 @@ public class SSTableWriter extends SSTable
SSTableMetadata sstableMetadata = p.right;
// finalize in-memory state for the reader
- SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(SSTable.COMPONENT_INDEX));
- SegmentedFile dfile = dbuilder.complete(newdesc.filenameFor(SSTable.COMPONENT_DATA));
+ SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(Component.PRIMARY_INDEX));
+ SegmentedFile dfile = dbuilder.complete(newdesc.filenameFor(Component.DATA));
SSTableReader sstable = SSTableReader.internalOpen(newdesc,
components,
metadata,
@@ -328,7 +328,7 @@ public class SSTableWriter extends SSTable
sstable.first = getMinimalKey(first);
sstable.last = getMinimalKey(last);
// try to save the summaries to disk
- SSTableReader.saveSummary(sstable, iwriter.builder, dbuilder);
+ sstable.saveSummary(iwriter.builder, dbuilder);
iwriter = null;
dbuilder = null;
return sstable;
@@ -355,7 +355,7 @@ public class SSTableWriter extends SSTable
private static void writeMetadata(Descriptor desc, SSTableMetadata sstableMetadata, Set<Integer> ancestors)
{
- SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(SSTable.COMPONENT_STATS)), true);
+ SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS)), true);
try
{
SSTableMetadata.serializer.serialize(sstableMetadata, ancestors, out.stream);
@@ -411,7 +411,7 @@ public class SSTableWriter extends SSTable
IndexWriter(long keyCount)
{
- indexFile = SequentialWriter.open(new File(descriptor.filenameFor(SSTable.COMPONENT_INDEX)),
+ indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)),
!metadata.populateIoCacheOnFlush());
builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
summary = new IndexSummaryBuilder(keyCount, metadata.getIndexInterval());
@@ -446,7 +446,7 @@ public class SSTableWriter extends SSTable
{
if (components.contains(Component.FILTER))
{
- String path = descriptor.filenameFor(SSTable.COMPONENT_FILTER);
+ String path = descriptor.filenameFor(Component.FILTER);
try
{
// bloom filter
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
index f334d08..0606941 100644
--- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
+++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
@@ -145,10 +145,10 @@ public class DataIntegrityMetadata
byte[] bytes = digest.digest();
if (bytes == null)
return;
- SequentialWriter out = SequentialWriter.open(new File(descriptor.filenameFor(SSTable.COMPONENT_DIGEST)), true);
+ SequentialWriter out = SequentialWriter.open(new File(descriptor.filenameFor(Component.DIGEST)), true);
// Writting output compatible with sha1sum
Descriptor newdesc = descriptor.asTemporary(false);
- String[] tmp = newdesc.filenameFor(SSTable.COMPONENT_DATA).split(Pattern.quote(File.separator));
+ String[] tmp = newdesc.filenameFor(Component.DATA).split(Pattern.quote(File.separator));
String dataFileName = tmp[tmp.length - 1];
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
index c6b6eb0..0eb44d0 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
@@ -31,7 +31,6 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
@@ -106,7 +105,7 @@ public class LongLeveledCompactionStrategyTest extends SchemaLoader
{
List<SSTableReader> sstables = manifest.getLevel(level);
// score check
- assert (double) SSTable.getTotalBytes(sstables) / manifest.maxBytesForLevel(level) < 1.00;
+ assert (double) SSTableReader.getTotalBytes(sstables) / manifest.maxBytesForLevel(level) < 1.00;
// overlap check for levels greater than 0
if (level > 0)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 32bc7df..7f7d5c9 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -938,7 +938,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
cfs.clearUnsafe();
assertEquals(0, cfs.getSSTables().size());
- new File(ssTables.iterator().next().descriptor.filenameFor(SSTable.COMPONENT_STATS)).delete();
+ new File(ssTables.iterator().next().descriptor.filenameFor(Component.STATS)).delete();
cfs.loadNewSSTables();
// Add another column with a lower timestamp
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index b60f6d9..a008de1 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -20,9 +20,7 @@ package org.apache.cassandra.db.compaction;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.UUID;
@@ -39,7 +37,6 @@ import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.repair.RepairJobDesc;
import org.apache.cassandra.repair.Validator;
@@ -145,7 +142,7 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
scanner.next();
// scanner.getCurrentPosition should be equal to total bytes of L1 sstables
- assert scanner.getCurrentPosition() == SSTable.getTotalBytes(sstables);
+ assert scanner.getCurrentPosition() == SSTableReader.getTotalBytes(sstables);
}
@Test
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index 36d8fbe..b771e72 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -278,7 +278,7 @@ public class SSTableReaderTest extends SchemaLoader
SegmentedFile.Builder dbuilder = sstable.compression
? SegmentedFile.getCompressedBuilder()
: SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
- SSTableReader.saveSummary(sstable, ibuilder, dbuilder);
+ sstable.saveSummary(ibuilder, dbuilder);
SSTableReader reopened = SSTableReader.open(sstable.descriptor);
assert reopened.first.token instanceof LocalToken;