You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/08/20 20:53:02 UTC
[2/2] cassandra git commit: Give compaction strategies more control
over sstable creation
Give compaction strategies more control over sstable creation
Patch by Blake Eggleston; reviewed by marcuse for CASSANDRA-8671
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9ed27277
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9ed27277
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9ed27277
Branch: refs/heads/cassandra-3.0
Commit: 9ed2727739c73d64086d09a86a407a77390f081a
Parents: 0d86645
Author: Blake Eggleston <bd...@gmail.com>
Authored: Thu Aug 6 10:19:55 2015 -0700
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Thu Aug 20 20:47:40 2015 +0200
----------------------------------------------------------------------
.../apache/cassandra/db/ColumnFamilyStore.java | 73 +++++++++---
.../org/apache/cassandra/db/Directories.java | 42 +++++--
src/java/org/apache/cassandra/db/Keyspace.java | 5 +
src/java/org/apache/cassandra/db/Memtable.java | 32 +++--
.../compaction/AbstractCompactionStrategy.java | 24 +++-
.../db/compaction/AbstractCompactionTask.java | 3 +-
.../db/compaction/CompactionManager.java | 6 +-
.../compaction/CompactionStrategyManager.java | 40 +++++--
.../cassandra/db/compaction/CompactionTask.java | 22 ++--
.../db/compaction/LeveledCompactionTask.java | 6 +-
.../db/compaction/SSTableSplitter.java | 3 +-
.../cassandra/db/compaction/Scrubber.java | 3 +-
.../SizeTieredCompactionStrategy.java | 4 +-
.../writers/CompactionAwareWriter.java | 53 ++++++---
.../writers/DefaultCompactionWriter.java | 32 ++---
.../writers/MajorLeveledCompactionWriter.java | 46 ++++----
.../writers/MaxSSTableSizeWriter.java | 45 ++++---
.../SplittingSizeTieredCompactionWriter.java | 52 ++++-----
.../db/lifecycle/LifecycleTransaction.java | 9 ++
.../apache/cassandra/db/lifecycle/Tracker.java | 34 +++---
.../org/apache/cassandra/db/lifecycle/View.java | 4 +-
.../io/sstable/AbstractSSTableSimpleWriter.java | 11 +-
.../io/sstable/SSTableMultiWriter.java | 54 +++++++++
.../cassandra/io/sstable/SSTableTxnWriter.java | 43 +++++--
.../io/sstable/SimpleSSTableMultiWriter.java | 116 +++++++++++++++++++
.../notifications/SSTableAddedNotification.java | 4 +-
.../cassandra/streaming/StreamReader.java | 22 ++--
.../cassandra/streaming/StreamReceiveTask.java | 22 ++--
.../compress/CompressedStreamReader.java | 8 +-
.../streaming/messages/IncomingFileMessage.java | 7 +-
.../cassandra/tools/SSTableExpiredBlockers.java | 3 +-
.../cassandra/tools/SSTableLevelResetter.java | 2 +-
.../cassandra/tools/SSTableOfflineRelevel.java | 5 +-
.../cassandra/tools/StandaloneScrubber.java | 2 +-
.../cassandra/tools/StandaloneUpgrader.java | 2 +-
.../cassandra/tools/StandaloneVerifier.java | 7 +-
.../db/compaction/LongCompactionsTest.java | 6 +-
test/unit/org/apache/cassandra/MockSchema.java | 2 +-
.../cassandra/db/ColumnFamilyStoreTest.java | 4 +-
.../unit/org/apache/cassandra/db/ScrubTest.java | 12 +-
.../db/compaction/AntiCompactionTest.java | 10 +-
.../compaction/CompactionAwareWriterTest.java | 8 +-
.../LeveledCompactionStrategyTest.java | 2 +-
.../db/lifecycle/RealTransactionsTest.java | 8 +-
.../cassandra/db/lifecycle/TrackerTest.java | 19 +--
.../apache/cassandra/db/lifecycle/ViewTest.java | 2 +-
.../io/sstable/BigTableWriterTest.java | 4 +-
.../io/sstable/CQLSSTableWriterClientTest.java | 2 +
.../io/sstable/SSTableRewriterTest.java | 10 +-
.../cassandra/io/sstable/SSTableUtils.java | 25 ++--
.../org/apache/cassandra/schema/DefsTest.java | 6 +-
51 files changed, 651 insertions(+), 315 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index a12de0a..b199c77 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -58,6 +58,7 @@ import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.*;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.TableMetrics.Sampler;
import org.apache.cassandra.metrics.TableMetrics;
@@ -75,6 +76,33 @@ import static org.apache.cassandra.utils.Throwables.maybeFail;
public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
+ // the directories used to load sstables on cfs instantiation
+ private static volatile Directories.DataDirectory[] initialDirectories = Directories.dataDirectories;
+
+ /**
+ * a hook to add additional directories to initialDirectories.
+ * Any additional directories should be added prior to ColumnFamilyStore instantiation on startup
+ */
+ public static synchronized void addInitialDirectories(Directories.DataDirectory[] newDirectories)
+ {
+ assert newDirectories != null;
+
+ Set<Directories.DataDirectory> existing = Sets.newHashSet(initialDirectories);
+
+ List<Directories.DataDirectory> replacementList = Lists.newArrayList(initialDirectories);
+ for (Directories.DataDirectory directory: newDirectories)
+ {
+ if (!existing.contains(directory))
+ {
+ replacementList.add(directory);
+ }
+ }
+
+ Directories.DataDirectory[] replacementArray = new Directories.DataDirectory[replacementList.size()];
+ replacementList.toArray(replacementArray);
+ initialDirectories = replacementArray;
+ }
+
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
@@ -164,7 +192,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
private volatile DefaultInteger maxCompactionThreshold;
private final CompactionStrategyManager compactionStrategyManager;
- public final Directories directories;
+ private volatile Directories directories;
public final TableMetrics metric;
public volatile long sampleLatencyNanos;
@@ -189,6 +217,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
cfs.maxCompactionThreshold = new DefaultInteger(metadata.params.compaction.maxCompactionThreshold());
compactionStrategyManager.maybeReload(metadata);
+ directories = compactionStrategyManager.getDirectories();
scheduleFlush();
@@ -330,6 +359,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
boolean loadSSTables,
boolean registerBookkeeping)
{
+ assert directories != null;
assert metadata != null : "null metadata for " + keyspace + ":" + columnFamilyName;
this.keyspace = keyspace;
@@ -363,6 +393,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// compaction strategy should be created after the CFS has been prepared
this.compactionStrategyManager = new CompactionStrategyManager(this);
+ this.directories = this.compactionStrategyManager.getDirectories();
if (maxCompactionThreshold.value() <= 0 || minCompactionThreshold.value() <=0)
{
@@ -426,6 +457,22 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
}
+ public Directories getDirectories()
+ {
+ return directories;
+ }
+
+ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleTransaction txn)
+ {
+ MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel);
+ return createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn);
+ }
+
+ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector metadataCollector, SerializationHeader header, LifecycleTransaction txn)
+ {
+ return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, metadataCollector, header, txn);
+ }
+
/** call when dropping or renaming a CF. Performs mbean housekeeping and invalidates CFS to other operations */
public void invalidate()
{
@@ -499,7 +546,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
boolean loadSSTables)
{
// get the max generation number, to prevent generation conflicts
- Directories directories = new Directories(metadata);
+ Directories directories = new Directories(metadata, initialDirectories);
Directories.SSTableLister lister = directories.sstableLister(Directories.OnTxnErr.IGNORE).includeBackups(true);
List<Integer> generations = new ArrayList<Integer>();
for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
@@ -633,7 +680,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
currentDescriptors.add(sstable.descriptor);
Set<SSTableReader> newSSTables = new HashSet<>();
- Directories.SSTableLister lister = directories.sstableLister(Directories.OnTxnErr.IGNORE).skipTemporary(true);
+ Directories.SSTableLister lister = getDirectories().sstableLister(Directories.OnTxnErr.IGNORE).skipTemporary(true);
for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
{
Descriptor descriptor = entry.getKey();
@@ -1378,9 +1425,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
maybeFail(data.dropSSTables(Predicates.in(sstables), compactionType, null));
}
- void replaceFlushed(Memtable memtable, SSTableReader sstable)
+ void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
{
- compactionStrategyManager.replaceFlushed(memtable, sstable);
+ compactionStrategyManager.replaceFlushed(memtable, sstables);
}
public boolean isValid()
@@ -1580,7 +1627,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
private void writeSnapshotManifest(final JSONArray filesJSONArr, final String snapshotName)
{
- final File manifestFile = directories.getSnapshotManifestFile(snapshotName);
+ final File manifestFile = getDirectories().getSnapshotManifestFile(snapshotName);
try
{
@@ -1602,7 +1649,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
private void createEphemeralSnapshotMarkerFile(final String snapshot)
{
- final File ephemeralSnapshotMarker = directories.getNewEphemeralSnapshotMarkerFile(snapshot);
+ final File ephemeralSnapshotMarker = getDirectories().getNewEphemeralSnapshotMarkerFile(snapshot);
try
{
@@ -1635,7 +1682,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
Map<Integer, SSTableReader> active = new HashMap<>();
for (SSTableReader sstable : getSSTables(SSTableSet.CANONICAL))
active.put(sstable.descriptor.generation, sstable);
- Map<Descriptor, Set<Component>> snapshots = directories.sstableLister(Directories.OnTxnErr.IGNORE).snapshots(tag).list();
+ Map<Descriptor, Set<Component>> snapshots = getDirectories().sstableLister(Directories.OnTxnErr.IGNORE).snapshots(tag).list();
Refs<SSTableReader> refs = new Refs<>();
try
{
@@ -1692,12 +1739,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public boolean snapshotExists(String snapshotName)
{
- return directories.snapshotExists(snapshotName);
+ return getDirectories().snapshotExists(snapshotName);
}
public long getSnapshotCreationTime(String snapshotName)
{
- return directories.snapshotCreationTime(snapshotName);
+ return getDirectories().snapshotCreationTime(snapshotName);
}
/**
@@ -1708,7 +1755,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
*/
public void clearSnapshot(String snapshotName)
{
- List<File> snapshotDirs = directories.getCFDirectories();
+ List<File> snapshotDirs = getDirectories().getCFDirectories();
Directories.clearSnapshot(snapshotName, snapshotDirs);
}
/**
@@ -1718,7 +1765,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
*/
public Map<String, Pair<Long,Long>> getSnapshotDetails()
{
- return directories.getSnapshotDetails();
+ return getDirectories().getSnapshotDetails();
}
/**
@@ -2251,7 +2298,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public long trueSnapshotsSize()
{
- return directories.trueSnapshotsSize();
+ return getDirectories().trueSnapshotsSize();
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index fa01269..90d2085 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -178,30 +178,36 @@ public class Directories
}
private final CFMetaData metadata;
+ private final DataDirectory[] paths;
private final File[] dataPaths;
+ public Directories(final CFMetaData metadata)
+ {
+ this(metadata, dataDirectories);
+ }
/**
* Create Directories of given ColumnFamily.
* SSTable directories are created under data_directories defined in cassandra.yaml if not exist at this time.
*
* @param metadata metadata of ColumnFamily
*/
- public Directories(final CFMetaData metadata)
+ public Directories(final CFMetaData metadata, DataDirectory[] paths)
{
this.metadata = metadata;
+ this.paths = paths;
String cfId = ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(metadata.cfId));
int idx = metadata.cfName.indexOf(SECONDARY_INDEX_NAME_SEPARATOR);
String cfName = idx >= 0 ? metadata.cfName.substring(0, idx) : metadata.cfName;
String indexNameWithDot = idx >= 0 ? metadata.cfName.substring(idx) : null;
- this.dataPaths = new File[dataDirectories.length];
+ this.dataPaths = new File[paths.length];
// If upgraded from version less than 2.1, use existing directories
String oldSSTableRelativePath = join(metadata.ksName, cfName);
- for (int i = 0; i < dataDirectories.length; ++i)
+ for (int i = 0; i < paths.length; ++i)
{
// check if old SSTable directory exists
- dataPaths[i] = new File(dataDirectories[i].location, oldSSTableRelativePath);
+ dataPaths[i] = new File(paths[i].location, oldSSTableRelativePath);
}
boolean olderDirectoryExists = Iterables.any(Arrays.asList(dataPaths), new Predicate<File>()
{
@@ -214,13 +220,13 @@ public class Directories
{
// use 2.1+ style
String newSSTableRelativePath = join(metadata.ksName, cfName + '-' + cfId);
- for (int i = 0; i < dataDirectories.length; ++i)
- dataPaths[i] = new File(dataDirectories[i].location, newSSTableRelativePath);
+ for (int i = 0; i < paths.length; ++i)
+ dataPaths[i] = new File(paths[i].location, newSSTableRelativePath);
}
// if index, then move to its own directory
if (indexNameWithDot != null)
{
- for (int i = 0; i < dataDirectories.length; ++i)
+ for (int i = 0; i < paths.length; ++i)
dataPaths[i] = new File(dataPaths[i], indexNameWithDot);
}
@@ -327,7 +333,7 @@ public class Directories
// pick directories with enough space and so that resulting sstable dirs aren't blacklisted for writes.
boolean tooBig = false;
- for (DataDirectory dataDir : dataDirectories)
+ for (DataDirectory dataDir : paths)
{
if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir)))
{
@@ -393,7 +399,7 @@ public class Directories
long writeSize = expectedTotalWriteSize / estimatedSSTables;
long totalAvailable = 0L;
- for (DataDirectory dataDir : dataDirectories)
+ for (DataDirectory dataDir : paths)
{
if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir)))
continue;
@@ -481,6 +487,24 @@ public class Directories
{
return location.getUsableSpace();
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ DataDirectory that = (DataDirectory) o;
+
+ return location.equals(that.location);
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return location.hashCode();
+ }
}
static final class DataDirectoryCandidate implements Comparable<DataDirectoryCandidate>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 78b593b..c2613fe 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -210,6 +210,11 @@ public class Keyspace
return cfs;
}
+ public boolean hasColumnFamilyStore(UUID id)
+ {
+ return columnFamilyStores.containsKey(id);
+ }
+
/**
* Take a snapshot of the specific column family, or the entire set of column families
* if columnFamily is null with a given timestamp
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 1b30fc7..4a54666 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -47,7 +47,6 @@ import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.DiskAwareRunnable;
import org.apache.cassandra.service.ActiveRepairService;
@@ -345,22 +344,22 @@ public class Memtable implements Comparable<Memtable>
{
long writeSize = getExpectedWriteSize();
Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize);
- File sstableDirectory = cfs.directories.getLocationForDisk(dataDirectory);
+ File sstableDirectory = cfs.getDirectories().getLocationForDisk(dataDirectory);
assert sstableDirectory != null : "Flush task is not bound to any disk";
- SSTableReader sstable = writeSortedContents(context, sstableDirectory);
- cfs.replaceFlushed(Memtable.this, sstable);
+ Collection<SSTableReader> sstables = writeSortedContents(context, sstableDirectory);
+ cfs.replaceFlushed(Memtable.this, sstables);
}
protected Directories getDirectories()
{
- return cfs.directories;
+ return cfs.getDirectories();
}
- private SSTableReader writeSortedContents(ReplayPosition context, File sstableDirectory)
+ private Collection<SSTableReader> writeSortedContents(ReplayPosition context, File sstableDirectory)
{
logger.info("Writing {}", Memtable.this.toString());
- SSTableReader ssTable;
+ Collection<SSTableReader> ssTables;
try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
{
boolean trackContention = logger.isDebugEnabled();
@@ -397,20 +396,20 @@ public class Memtable implements Comparable<Memtable>
context));
// sstables should contain non-repaired data.
- ssTable = writer.finish(true);
+ ssTables = writer.finish(true);
}
else
{
logger.info("Completed flushing {}; nothing needed to be retained. Commitlog position was {}",
writer.getFilename(), context);
writer.abort();
- ssTable = null;
+ ssTables = null;
}
if (heavilyContendedRowCount > 0)
logger.debug(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
- return ssTable;
+ return ssTables;
}
}
@@ -423,13 +422,12 @@ public class Memtable implements Comparable<Memtable>
LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH, cfs.metadata);
MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context);
return new SSTableTxnWriter(txn,
- SSTableWriter.create(Descriptor.fromFilename(filename),
- (long)partitions.size(),
- ActiveRepairService.UNREPAIRED_SSTABLE,
- cfs.metadata,
- sstableMetadataCollector,
- new SerializationHeader(cfs.metadata, columns, stats),
- txn));
+ cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),
+ (long)partitions.size(),
+ ActiveRepairService.UNREPAIRED_SSTABLE,
+ sstableMetadataCollector,
+ new SerializationHeader(cfs.metadata, columns, stats),
+ txn));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index d9c9ea3..721fd70 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -25,7 +25,12 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.RateLimiter;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.SimpleSSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +43,7 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.utils.JVMStabilityInspector;
/**
@@ -113,6 +119,11 @@ public abstract class AbstractCompactionStrategy
}
}
+ public Directories getDirectories()
+ {
+ return cfs.getDirectories();
+ }
+
/**
* For internal, temporary suspension of background compactions so that we can do exceptional
* things like truncate or major compaction
@@ -222,12 +233,12 @@ public abstract class AbstractCompactionStrategy
* Handle a flushed memtable.
*
* @param memtable the flushed memtable
- * @param sstable the written sstable. can be null if the memtable was clean.
+ * @param sstables the written sstables. can be null or empty if the memtable was clean.
*/
- public void replaceFlushed(Memtable memtable, SSTableReader sstable)
+ public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
{
- cfs.getTracker().replaceFlushed(memtable, sstable);
- if (sstable != null)
+ cfs.getTracker().replaceFlushed(memtable, sstables);
+ if (sstables != null && !sstables.isEmpty())
CompactionManager.instance.submitBackground(cfs);
}
@@ -493,4 +504,9 @@ public abstract class AbstractCompactionStrategy
groupedSSTables.add(currGroup);
return groupedSSTables;
}
+
+ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector meta, SerializationHeader header, LifecycleTransaction txn)
+ {
+ return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfs.metadata, meta, header, txn);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
index 3bf224e..155bf2f 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.compaction;
import java.util.Set;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -63,7 +64,7 @@ public abstract class AbstractCompactionTask extends WrappedRunnable
transaction.close();
}
}
- public abstract CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables);
+ public abstract CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables);
protected abstract int executeInternal(CompactionExecutorStatsCollector collector);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 8aa16d5..66f9ed5 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -594,7 +594,7 @@ public class CompactionManager implements CompactionManagerMBean
}
// group by keyspace/columnfamily
ColumnFamilyStore cfs = Keyspace.open(desc.ksname).getColumnFamilyStore(desc.cfname);
- descriptors.put(cfs, cfs.directories.find(new File(filename.trim()).getName()));
+ descriptors.put(cfs, cfs.getDirectories().find(new File(filename.trim()).getName()));
}
List<Future<?>> futures = new ArrayList<>();
@@ -817,7 +817,7 @@ public class CompactionManager implements CompactionManagerMBean
logger.info("Cleaning up {}", sstable);
- File compactionFileLocation = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(txn.originals(), OperationType.CLEANUP));
+ File compactionFileLocation = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(txn.originals(), OperationType.CLEANUP));
if (compactionFileLocation == null)
throw new IOException("disk full");
@@ -1192,7 +1192,7 @@ public class CompactionManager implements CompactionManagerMBean
logger.info("Anticompacting {}", anticompactionGroup);
Set<SSTableReader> sstableAsSet = anticompactionGroup.originals();
- File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
+ File destination = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
long repairedKeyCount = 0;
long unrepairedKeyCount = 0;
int nowInSec = FBUtilities.nowInSeconds();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index f5097af..47c8de8 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -27,15 +27,21 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.notifications.*;
import org.apache.cassandra.schema.CompactionParams;
+import org.apache.cassandra.service.ActiveRepairService;
/**
* Manages the compaction strategies.
@@ -181,10 +187,10 @@ public class CompactionStrategyManager implements INotificationConsumer
startup();
}
- public void replaceFlushed(Memtable memtable, SSTableReader sstable)
+ public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
{
- cfs.getTracker().replaceFlushed(memtable, sstable);
- if (sstable != null)
+ cfs.getTracker().replaceFlushed(memtable, sstables);
+ if (sstables != null && !sstables.isEmpty())
CompactionManager.instance.submitBackground(cfs);
}
@@ -235,16 +241,24 @@ public class CompactionStrategyManager implements INotificationConsumer
return repaired.shouldDefragment();
}
+ public Directories getDirectories()
+ {
+ assert repaired.getClass().equals(unrepaired.getClass());
+ return repaired.getDirectories();
+ }
public synchronized void handleNotification(INotification notification, Object sender)
{
if (notification instanceof SSTableAddedNotification)
{
SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification;
- if (flushedNotification.added.isRepaired())
- repaired.addSSTable(flushedNotification.added);
- else
- unrepaired.addSSTable(flushedNotification.added);
+ for (SSTableReader sstable : flushedNotification.added)
+ {
+ if (sstable.isRepaired())
+ repaired.addSSTable(sstable);
+ else
+ unrepaired.addSSTable(sstable);
+ }
}
else if (notification instanceof SSTableListChangedNotification)
{
@@ -484,4 +498,16 @@ public class CompactionStrategyManager implements INotificationConsumer
{
return Boolean.parseBoolean(params.options().get(AbstractCompactionStrategy.ONLY_PURGE_REPAIRED_TOMBSTONES));
}
+
+ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector collector, SerializationHeader header, LifecycleTransaction txn)
+ {
+ if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
+ {
+ return unrepaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn);
+ }
+ else
+ {
+ return repaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 0bd6aae..1d96324 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db.compaction;
import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -29,9 +28,9 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
+import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
import org.apache.cassandra.db.compaction.writers.DefaultCompactionWriter;
-import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@ -43,7 +42,6 @@ import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.Refs;
@@ -51,8 +49,8 @@ public class CompactionTask extends AbstractCompactionTask
{
protected static final Logger logger = LoggerFactory.getLogger(CompactionTask.class);
protected final int gcBefore;
- private final boolean offline;
- private final boolean keepOriginals;
+ protected final boolean offline;
+ protected final boolean keepOriginals;
protected static long totalBytesCompacted = 0;
private CompactionExecutorStatsCollector collector;
@@ -154,7 +152,7 @@ public class CompactionTask extends AbstractCompactionTask
{
Set<SSTableReader> actuallyCompact = Sets.difference(transaction.originals(), controller.getFullyExpiredSSTables());
- List<SSTableReader> newSStables;
+ Collection<SSTableReader> newSStables;
long[] mergedRowCounts;
@@ -173,7 +171,7 @@ public class CompactionTask extends AbstractCompactionTask
if (!controller.cfs.getCompactionStrategyManager().isActive)
throw new CompactionInterruptedException(ci.getCompactionInfo());
- try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact))
+ try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, getDirectories(), transaction, actuallyCompact))
{
estimatedKeys = writer.estimatedKeys();
while (ci.hasNext())
@@ -228,10 +226,11 @@ public class CompactionTask extends AbstractCompactionTask
@Override
public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
+ Directories directories,
LifecycleTransaction transaction,
Set<SSTableReader> nonExpiredSSTables)
{
- return new DefaultCompactionWriter(cfs, transaction, nonExpiredSSTables, offline, keepOriginals);
+ return new DefaultCompactionWriter(cfs, directories, transaction, nonExpiredSSTables, offline, keepOriginals);
}
public static String updateCompactionHistory(String keyspaceName, String columnFamilyName, long[] mergedRowCounts, long startSize, long endSize)
@@ -252,6 +251,11 @@ public class CompactionTask extends AbstractCompactionTask
return mergeSummary.toString();
}
+ protected Directories getDirectories()
+ {
+ return cfs.getDirectories();
+ }
+
public static long getMinRepairedAt(Set<SSTableReader> actuallyCompact)
{
long minRepairedAt= Long.MAX_VALUE;
@@ -264,7 +268,7 @@ public class CompactionTask extends AbstractCompactionTask
protected void checkAvailableDiskSpace(long estimatedSSTables, long expectedWriteSize)
{
- while (!cfs.directories.hasAvailableDiskSpace(estimatedSSTables, expectedWriteSize))
+ while (!getDirectories().hasAvailableDiskSpace(estimatedSSTables, expectedWriteSize))
{
if (!reduceScopeForLimitedSpace())
throw new RuntimeException(String.format("Not enough space for compaction, estimated sstables = %d, expected write size = %d", estimatedSSTables, expectedWriteSize));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
index 11d113d..eeb3615 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.compaction;
import java.util.Set;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
import org.apache.cassandra.db.compaction.writers.MajorLeveledCompactionWriter;
import org.apache.cassandra.db.compaction.writers.MaxSSTableSizeWriter;
@@ -42,12 +43,13 @@ public class LeveledCompactionTask extends CompactionTask
@Override
public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
+ Directories directories,
LifecycleTransaction txn,
Set<SSTableReader> nonExpiredSSTables)
{
if (majorCompaction)
- return new MajorLeveledCompactionWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, false, false);
- return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, getLevel(), false, false);
+ return new MajorLeveledCompactionWriter(cfs, directories, txn, nonExpiredSSTables, maxSSTableBytes, false, false);
+ return new MaxSSTableSizeWriter(cfs, directories, txn, nonExpiredSSTables, maxSSTableBytes, getLevel(), false, false);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
index 1944364..3655a37 100644
--- a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
+++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
@@ -75,10 +75,11 @@ public class SSTableSplitter {
@Override
public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
+ Directories directories,
LifecycleTransaction txn,
Set<SSTableReader> nonExpiredSSTables)
{
- return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, sstableSizeInMB * 1024L * 1024L, 0, true, false);
+ return new MaxSSTableSizeWriter(cfs, directories, txn, nonExpiredSSTables, sstableSizeInMB * 1024L * 1024L, 0, true, false);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 747b956..c437832 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -35,7 +35,6 @@ import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.OutputHandler;
import org.apache.cassandra.utils.UUIDGen;
@@ -106,7 +105,7 @@ public class Scrubber implements Closeable
List<SSTableReader> toScrub = Collections.singletonList(sstable);
// Calculate the expected compacted filesize
- this.destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(toScrub, OperationType.SCRUB));
+ this.destination = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(toScrub, OperationType.SCRUB));
if (destination == null)
throw new IOException("disk full");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 2353aa3..05f446c 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
import org.apache.cassandra.db.compaction.writers.SplittingSizeTieredCompactionWriter;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -342,10 +343,11 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
@Override
public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
+ Directories directories,
LifecycleTransaction txn,
Set<SSTableReader> nonExpiredSSTables)
{
- return new SplittingSizeTieredCompactionWriter(cfs, txn, nonExpiredSSTables);
+ return new SplittingSizeTieredCompactionWriter(cfs, directories, txn, nonExpiredSSTables);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index 50e5a96..abc4107 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@ -18,14 +18,17 @@
package org.apache.cassandra.db.compaction.writers;
+import java.util.Collection;
import java.util.List;
import java.util.Set;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.compaction.CompactionTask;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.SSTableRewriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.concurrent.Transactional;
@@ -38,6 +41,7 @@ import org.apache.cassandra.utils.concurrent.Transactional;
public abstract class CompactionAwareWriter extends Transactional.AbstractTransactional implements Transactional
{
protected final ColumnFamilyStore cfs;
+ protected final Directories directories;
protected final Set<SSTableReader> nonExpiredSSTables;
protected final long estimatedTotalKeys;
protected final long maxAge;
@@ -45,35 +49,25 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
protected final LifecycleTransaction txn;
protected final SSTableRewriter sstableWriter;
+ private boolean isInitialized = false;
public CompactionAwareWriter(ColumnFamilyStore cfs,
- LifecycleTransaction txn,
- Set<SSTableReader> nonExpiredSSTables)
- {
- this(cfs, txn, nonExpiredSSTables, false, false);
- }
-
- public CompactionAwareWriter(ColumnFamilyStore cfs,
+ Directories directories,
LifecycleTransaction txn,
Set<SSTableReader> nonExpiredSSTables,
boolean offline,
boolean keepOriginals)
{
this.cfs = cfs;
+ this.directories = directories;
this.nonExpiredSSTables = nonExpiredSSTables;
this.estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
this.maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables);
this.minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables);
this.txn = txn;
this.sstableWriter = new SSTableRewriter(cfs, txn, maxAge, offline).keepOriginals(keepOriginals);
- }
- /**
- * Writes a partition in an implementation specific way
- * @param partition the partition to append
- * @return true if the partition was written, false otherwise
- */
- public abstract boolean append(UnfilteredRowIterator partition);
+ }
@Override
protected Throwable doAbort(Throwable accumulate)
@@ -98,7 +92,7 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
* @return all the written sstables sstables
*/
@Override
- public List<SSTableReader> finish()
+ public Collection<SSTableReader> finish()
{
super.finish();
return sstableWriter.finished();
@@ -112,12 +106,39 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
return estimatedTotalKeys;
}
+ public final boolean append(UnfilteredRowIterator partition)
+ {
+ maybeSwitchWriter(partition.partitionKey());
+ return realAppend(partition);
+ }
+
+ protected abstract boolean realAppend(UnfilteredRowIterator partition);
+
+ /**
+ * Guaranteed to be called before the first call to realAppend.
+ * @param key
+ */
+ protected void maybeSwitchWriter(DecoratedKey key)
+ {
+ if (!isInitialized)
+ switchCompactionLocation(getDirectories().getWriteableLocation(cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType())));
+ isInitialized = true;
+ }
+
+ /**
+ * Implementations of this method should finish the current sstable writer and start writing to this directory.
+ *
+ * Called once before starting to append and then whenever we see a need to start writing to another directory.
+ * @param directory
+ */
+ protected abstract void switchCompactionLocation(Directories.DataDirectory directory);
+
/**
* The directories we can write to
*/
public Directories getDirectories()
{
- return cfs.directories;
+ return directories;
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
index eb55d20..8b90224 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
@@ -18,13 +18,13 @@
package org.apache.cassandra.db.compaction.writers;
-import java.io.File;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -40,20 +40,28 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
{
protected static final Logger logger = LoggerFactory.getLogger(DefaultCompactionWriter.class);
- public DefaultCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
+ public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
{
- this(cfs, txn, nonExpiredSSTables, false, false);
+ this(cfs, directories, txn, nonExpiredSSTables, false, false);
}
@SuppressWarnings("resource")
- public DefaultCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline, boolean keepOriginals)
+ public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline, boolean keepOriginals)
+ {
+ super(cfs, directories, txn, nonExpiredSSTables, offline, keepOriginals);
+ }
+
+ @Override
+ public boolean realAppend(UnfilteredRowIterator partition)
+ {
+ return sstableWriter.append(partition) != null;
+ }
+
+ @Override
+ protected void switchCompactionLocation(Directories.DataDirectory directory)
{
- super(cfs, txn, nonExpiredSSTables, offline, keepOriginals);
- logger.debug("Expected bloom filter size : {}", estimatedTotalKeys);
- long expectedWriteSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType());
- File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
@SuppressWarnings("resource")
- SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
+ SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(directory))),
estimatedTotalKeys,
minRepairedAt,
cfs.metadata,
@@ -64,12 +72,6 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
}
@Override
- public boolean append(UnfilteredRowIterator partition)
- {
- return sstableWriter.append(partition) != null;
- }
-
- @Override
public long estimatedKeys()
{
return estimatedTotalKeys;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index 73ce216..6d191f8 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@ -24,6 +24,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
@@ -47,43 +48,32 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
private int sstablesWritten = 0;
public MajorLeveledCompactionWriter(ColumnFamilyStore cfs,
+ Directories directories,
LifecycleTransaction txn,
Set<SSTableReader> nonExpiredSSTables,
long maxSSTableSize)
{
- this(cfs, txn, nonExpiredSSTables, maxSSTableSize, false, false);
+ this(cfs, directories, txn, nonExpiredSSTables, maxSSTableSize, false, false);
}
@SuppressWarnings("resource")
public MajorLeveledCompactionWriter(ColumnFamilyStore cfs,
+ Directories directories,
LifecycleTransaction txn,
Set<SSTableReader> nonExpiredSSTables,
long maxSSTableSize,
boolean offline,
boolean keepOriginals)
{
- super(cfs, txn, nonExpiredSSTables, offline, keepOriginals);
+ super(cfs, directories, txn, nonExpiredSSTables, offline, keepOriginals);
this.maxSSTableSize = maxSSTableSize;
this.allSSTables = txn.originals();
expectedWriteSize = Math.min(maxSSTableSize, cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType()));
- long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(nonExpiredSSTables) / maxSSTableSize);
- long keysPerSSTable = estimatedTotalKeys / estimatedSSTables;
- File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
-
- @SuppressWarnings("resource")
- SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
- keysPerSSTable,
- minRepairedAt,
- cfs.metadata,
- new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel),
- SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
- txn);
- sstableWriter.switchWriter(writer);
}
@Override
@SuppressWarnings("resource")
- public boolean append(UnfilteredRowIterator partition)
+ public boolean realAppend(UnfilteredRowIterator partition)
{
long posBefore = sstableWriter.currentWriter().getOnDiskFilePointer();
RowIndexEntry rie = sstableWriter.append(partition);
@@ -98,19 +88,25 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
}
averageEstimatedKeysPerSSTable = Math.round(((double) averageEstimatedKeysPerSSTable * sstablesWritten + partitionsWritten) / (sstablesWritten + 1));
- File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
- SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
- averageEstimatedKeysPerSSTable,
- minRepairedAt,
- cfs.metadata,
- new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel),
- SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
- txn);
- sstableWriter.switchWriter(writer);
+ switchCompactionLocation(getWriteDirectory(expectedWriteSize));
partitionsWritten = 0;
sstablesWritten++;
}
return rie != null;
}
+
+ public void switchCompactionLocation(Directories.DataDirectory directory)
+ {
+ File sstableDirectory = getDirectories().getLocationForDisk(directory);
+ @SuppressWarnings("resource")
+ SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
+ averageEstimatedKeysPerSSTable,
+ minRepairedAt,
+ cfs.metadata,
+ new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel),
+ SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+ txn);
+ sstableWriter.switchWriter(writer);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index 241af0d..142fe87 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@ -17,10 +17,10 @@
*/
package org.apache.cassandra.db.compaction.writers;
-import java.io.File;
import java.util.Set;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
@@ -40,16 +40,18 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
private final Set<SSTableReader> allSSTables;
public MaxSSTableSizeWriter(ColumnFamilyStore cfs,
+ Directories directories,
LifecycleTransaction txn,
Set<SSTableReader> nonExpiredSSTables,
long maxSSTableSize,
int level)
{
- this(cfs, txn, nonExpiredSSTables, maxSSTableSize, level, false, false);
+ this(cfs, directories, txn, nonExpiredSSTables, maxSSTableSize, level, false, false);
}
@SuppressWarnings("resource")
public MaxSSTableSizeWriter(ColumnFamilyStore cfs,
+ Directories directories,
LifecycleTransaction txn,
Set<SSTableReader> nonExpiredSSTables,
long maxSSTableSize,
@@ -57,7 +59,7 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
boolean offline,
boolean keepOriginals)
{
- super(cfs, txn, nonExpiredSSTables, offline, keepOriginals);
+ super(cfs, directories, txn, nonExpiredSSTables, offline, keepOriginals);
this.allSSTables = txn.originals();
this.level = level;
this.maxSSTableSize = maxSSTableSize;
@@ -65,37 +67,30 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
expectedWriteSize = Math.min(maxSSTableSize, totalSize);
estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
estimatedSSTables = Math.max(1, estimatedTotalKeys / maxSSTableSize);
- File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
+ }
+
+ @Override
+ public boolean realAppend(UnfilteredRowIterator partition)
+ {
+ RowIndexEntry rie = sstableWriter.append(partition);
+ if (sstableWriter.currentWriter().getOnDiskFilePointer() > maxSSTableSize)
+ switchCompactionLocation(getWriteDirectory(expectedWriteSize));
+ return rie != null;
+ }
+
+ public void switchCompactionLocation(Directories.DataDirectory location)
+ {
@SuppressWarnings("resource")
- SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
+ SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(location))),
estimatedTotalKeys / estimatedSSTables,
minRepairedAt,
cfs.metadata,
new MetadataCollector(allSSTables, cfs.metadata.comparator, level),
SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
txn);
- sstableWriter.switchWriter(writer);
- }
- @Override
- public boolean append(UnfilteredRowIterator partition)
- {
- RowIndexEntry rie = sstableWriter.append(partition);
- if (sstableWriter.currentWriter().getOnDiskFilePointer() > maxSSTableSize)
- {
- File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
- @SuppressWarnings("resource")
- SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
- estimatedTotalKeys / estimatedSSTables,
- minRepairedAt,
- cfs.metadata,
- new MetadataCollector(allSSTables, cfs.metadata.comparator, level),
- SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
- txn);
+ sstableWriter.switchWriter(writer);
- sstableWriter.switchWriter(writer);
- }
- return rie != null;
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index 65924fa..07ca3d0 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@ -25,6 +25,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
@@ -51,15 +52,15 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
private long currentBytesToWrite;
private int currentRatioIndex = 0;
- public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
+ public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
{
- this(cfs, txn, nonExpiredSSTables, DEFAULT_SMALLEST_SSTABLE_BYTES);
+ this(cfs, directories, txn, nonExpiredSSTables, DEFAULT_SMALLEST_SSTABLE_BYTES);
}
@SuppressWarnings("resource")
- public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long smallestSSTable)
+ public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long smallestSSTable)
{
- super(cfs, txn, nonExpiredSSTables, false, false);
+ super(cfs, directories, txn, nonExpiredSSTables, false, false);
this.allSSTables = txn.originals();
totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType());
double[] potentialRatios = new double[20];
@@ -81,43 +82,38 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
}
}
ratios = Arrays.copyOfRange(potentialRatios, 0, noPointIndex);
- File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex])));
long currentPartitionsToWrite = Math.round(estimatedTotalKeys * ratios[currentRatioIndex]);
currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]);
- @SuppressWarnings("resource")
- SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
- currentPartitionsToWrite,
- minRepairedAt,
- cfs.metadata,
- new MetadataCollector(allSSTables, cfs.metadata.comparator, 0),
- SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
- txn);
-
- sstableWriter.switchWriter(writer);
+ switchCompactionLocation(getWriteDirectory(currentBytesToWrite));
logger.debug("Ratios={}, expectedKeys = {}, totalSize = {}, currentPartitionsToWrite = {}, currentBytesToWrite = {}", ratios, estimatedTotalKeys, totalSize, currentPartitionsToWrite, currentBytesToWrite);
}
@Override
- public boolean append(UnfilteredRowIterator partition)
+ public boolean realAppend(UnfilteredRowIterator partition)
{
RowIndexEntry rie = sstableWriter.append(partition);
if (sstableWriter.currentWriter().getOnDiskFilePointer() > currentBytesToWrite && currentRatioIndex < ratios.length - 1) // if we underestimate how many keys we have, the last sstable might get more than we expect
{
currentRatioIndex++;
currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]);
- long currentPartitionsToWrite = Math.round(ratios[currentRatioIndex] * estimatedTotalKeys);
- File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex])));
- @SuppressWarnings("resource")
- SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
- currentPartitionsToWrite,
- minRepairedAt,
- cfs.metadata,
- new MetadataCollector(allSSTables, cfs.metadata.comparator, 0),
- SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
- txn);
- sstableWriter.switchWriter(writer);
- logger.debug("Switching writer, currentPartitionsToWrite = {}", currentPartitionsToWrite);
+ switchCompactionLocation(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex])));
}
return rie != null;
}
+
+ public void switchCompactionLocation(Directories.DataDirectory location)
+ {
+ long currentPartitionsToWrite = Math.round(ratios[currentRatioIndex] * estimatedTotalKeys);
+ @SuppressWarnings("resource")
+ SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(location))),
+ currentPartitionsToWrite,
+ minRepairedAt,
+ cfs.metadata,
+ new MetadataCollector(allSSTables, cfs.metadata.comparator, 0),
+ SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+ txn);
+ logger.debug("Switching writer, currentPartitionsToWrite = {}", currentPartitionsToWrite);
+ sstableWriter.switchWriter(writer);
+
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
index c6cb979..520b229 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
@@ -338,6 +338,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
return accumulate;
}
+
/**
* update a reader: if !original, this is a reader that is being introduced by this transaction;
* otherwise it must be in the originals() set, i.e. a reader guarded by this transaction
@@ -355,6 +356,14 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
reader.setupOnline();
}
+ public void update(Collection<SSTableReader> readers, boolean original)
+ {
+ for(SSTableReader reader: readers)
+ {
+ update(reader, original);
+ }
+ }
+
/**
* mark this reader as for obsoletion : on checkpoint() the reader will be removed from the live set
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index 6f6aca9..d028493 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -186,11 +186,8 @@ public class Tracker
public void addSSTables(Iterable<SSTableReader> sstables)
{
addInitialSSTables(sstables);
- for (SSTableReader sstable : sstables)
- {
- maybeIncrementallyBackup(sstable);
- notifyAdded(sstable);
- }
+ maybeIncrementallyBackup(sstables);
+ notifyAdded(sstables);
}
/** (Re)initializes the tracker, purging all references. */
@@ -330,10 +327,10 @@ public class Tracker
apply(View.markFlushing(memtable));
}
- public void replaceFlushed(Memtable memtable, SSTableReader sstable)
+ public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
{
assert !isDummy();
- if (sstable == null)
+ if (sstables == null || sstables.isEmpty())
{
// sstable may be null if we flushed batchlog and nothing needed to be retained
// if it's null, we don't care what state the cfstore is in, we just replace it and continue
@@ -341,16 +338,16 @@ public class Tracker
return;
}
- sstable.setupOnline();
+ sstables.forEach(SSTableReader::setupOnline);
// back up before creating a new Snapshot (which makes the new one eligible for compaction)
- maybeIncrementallyBackup(sstable);
+ maybeIncrementallyBackup(sstables);
- apply(View.replaceFlushed(memtable, sstable));
+ apply(View.replaceFlushed(memtable, sstables));
Throwable fail;
- fail = updateSizeTracking(emptySet(), singleton(sstable), null);
+ fail = updateSizeTracking(emptySet(), sstables, null);
// TODO: if we're invalidated, should we notifyadded AND removed, or just skip both?
- fail = notifyAdded(sstable, fail);
+ fail = notifyAdded(sstables, fail);
if (!isDummy() && !cfstore.isValid())
dropSSTables();
@@ -377,13 +374,16 @@ public class Tracker
return view.get().getUncompacting(candidates);
}
- public void maybeIncrementallyBackup(final SSTableReader sstable)
+ public void maybeIncrementallyBackup(final Iterable<SSTableReader> sstables)
{
if (!DatabaseDescriptor.isIncrementalBackupsEnabled())
return;
- File backupsDir = Directories.getBackupsDirectory(sstable.descriptor);
- sstable.createLinks(FileUtils.getCanonicalPath(backupsDir));
+ for (SSTableReader sstable : sstables)
+ {
+ File backupsDir = Directories.getBackupsDirectory(sstable.descriptor);
+ sstable.createLinks(FileUtils.getCanonicalPath(backupsDir));
+ }
}
// NOTIFICATION
@@ -405,7 +405,7 @@ public class Tracker
return accumulate;
}
- Throwable notifyAdded(SSTableReader added, Throwable accumulate)
+ Throwable notifyAdded(Iterable<SSTableReader> added, Throwable accumulate)
{
INotification notification = new SSTableAddedNotification(added);
for (INotificationConsumer subscriber : subscribers)
@@ -422,7 +422,7 @@ public class Tracker
return accumulate;
}
- public void notifyAdded(SSTableReader added)
+ public void notifyAdded(Iterable<SSTableReader> added)
{
maybeFail(notifyAdded(added, null));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java b/src/java/org/apache/cassandra/db/lifecycle/View.java
index 7ee0fdf..b62c7e3 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@ -310,7 +310,7 @@ public class View
}
// called after flush: removes memtable from flushingMemtables, and inserts flushed into the live sstable set
- static Function<View, View> replaceFlushed(final Memtable memtable, final SSTableReader flushed)
+ static Function<View, View> replaceFlushed(final Memtable memtable, final Collection<SSTableReader> flushed)
{
return new Function<View, View>()
{
@@ -323,7 +323,7 @@ public class View
return new View(view.liveMemtables, flushingMemtables, view.sstablesMap,
view.compactingMap, view.intervalTree);
- Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), singleton(flushed));
+ Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), flushed);
return new View(view.liveMemtables, flushingMemtables, sstableMap, view.compactingMap,
SSTableIntervalTree.build(sstableMap.keySet()));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index f4b4da8..d94b219 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -60,11 +60,12 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
protected SSTableTxnWriter createWriter()
{
- return SSTableTxnWriter.create(createDescriptor(directory, metadata.ksName, metadata.cfName, formatType),
- 0,
- ActiveRepairService.UNREPAIRED_SSTABLE,
- 0,
- new SerializationHeader(metadata, columns, EncodingStats.NO_STATS));
+ return SSTableTxnWriter.create(metadata,
+ createDescriptor(directory, metadata.ksName, metadata.cfName, formatType),
+ 0,
+ ActiveRepairService.UNREPAIRED_SSTABLE,
+ 0,
+ new SerializationHeader(metadata, columns, EncodingStats.NO_STATS));
}
private static Descriptor createDescriptor(File directory, final String keyspace, final String columnFamily, final SSTableFormat.Type fmt)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java
new file mode 100644
index 0000000..0bb3721
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+public interface SSTableMultiWriter extends Transactional
+{
+
+ /**
+ * Writes a partition in an implementation specific way
+ * @param partition the partition to append
+ * @return true if the partition was written, false otherwise
+ */
+ boolean append(UnfilteredRowIterator partition);
+
+ Collection<SSTableReader> finish(long repairedAt, long maxDataAge, boolean openResult);
+ Collection<SSTableReader> finish(boolean openResult);
+ Collection<SSTableReader> finished();
+
+ SSTableMultiWriter setOpenResult(boolean openResult);
+
+ String getFilename();
+ long getFilePointer();
+ UUID getCfId();
+
+ static void abortOrDie(SSTableMultiWriter writer)
+ {
+ Throwables.maybeFail(writer.abort(null));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
index 42bffb1..6e1ac38 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
@@ -18,13 +18,17 @@
package org.apache.cassandra.io.sstable;
-import org.apache.cassandra.db.RowIndexEntry;
+import java.util.Collection;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.utils.concurrent.Transactional;
/**
@@ -35,15 +39,15 @@ import org.apache.cassandra.utils.concurrent.Transactional;
public class SSTableTxnWriter extends Transactional.AbstractTransactional implements Transactional
{
private final LifecycleTransaction txn;
- private final SSTableWriter writer;
+ private final SSTableMultiWriter writer;
- public SSTableTxnWriter(LifecycleTransaction txn, SSTableWriter writer)
+ public SSTableTxnWriter(LifecycleTransaction txn, SSTableMultiWriter writer)
{
this.txn = txn;
this.writer = writer;
}
- public RowIndexEntry append(UnfilteredRowIterator iterator)
+ public boolean append(UnfilteredRowIterator iterator)
{
return writer.append(iterator);
}
@@ -74,28 +78,43 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem
writer.prepareToCommit();
}
- public SSTableReader finish(boolean openResult)
+ public Collection<SSTableReader> finish(boolean openResult)
{
writer.setOpenResult(openResult);
finish();
return writer.finished();
}
- public static SSTableTxnWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
+ public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
{
LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE, descriptor.directory);
- SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, sstableLevel, header, txn);
+ SSTableMultiWriter writer = cfs.createSSTableMultiWriter(descriptor, keyCount, repairedAt, sstableLevel, header, txn);
+ return new SSTableTxnWriter(txn, writer);
+ }
+
+ public static SSTableTxnWriter create(CFMetaData cfm, Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
+ {
+ if (Keyspace.open(cfm.ksName).hasColumnFamilyStore(cfm.cfId))
+ {
+ ColumnFamilyStore cfs = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfId);
+ return create(cfs, descriptor, keyCount, repairedAt, sstableLevel, header);
+ }
+
+ // if the column family store does not exist, we create a new default SSTableMultiWriter to use:
+ LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE, descriptor.directory);
+ MetadataCollector collector = new MetadataCollector(cfm.comparator).sstableLevel(sstableLevel);
+ SSTableMultiWriter writer = SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfm, collector, header, txn);
return new SSTableTxnWriter(txn, writer);
}
- public static SSTableTxnWriter create(String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
+ public static SSTableTxnWriter create(ColumnFamilyStore cfs, String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
{
Descriptor desc = Descriptor.fromFilename(filename);
- return create(desc, keyCount, repairedAt, sstableLevel, header);
+ return create(cfs, desc, keyCount, repairedAt, sstableLevel, header);
}
- public static SSTableTxnWriter create(String filename, long keyCount, long repairedAt, SerializationHeader header)
+ public static SSTableTxnWriter create(ColumnFamilyStore cfs, String filename, long keyCount, long repairedAt, SerializationHeader header)
{
- return create(filename, keyCount, repairedAt, 0, header);
+ return create(cfs, filename, keyCount, repairedAt, 0, header);
}
}