You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/08/20 20:53:01 UTC
[1/2] cassandra git commit: Give compaction strategies more control
over sstable creation
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 0d866456a -> 9ed272773
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
new file mode 100644
index 0000000..2112656
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+
+public class SimpleSSTableMultiWriter implements SSTableMultiWriter
+{
+ private final SSTableWriter writer;
+
+ private SimpleSSTableMultiWriter(SSTableWriter writer)
+ {
+ this.writer = writer;
+ }
+
+ public boolean append(UnfilteredRowIterator partition)
+ {
+ RowIndexEntry indexEntry = writer.append(partition);
+ return indexEntry != null;
+ }
+
+ public Collection<SSTableReader> finish(long repairedAt, long maxDataAge, boolean openResult)
+ {
+ return Collections.singleton(writer.finish(repairedAt, maxDataAge, openResult));
+ }
+
+ public Collection<SSTableReader> finish(boolean openResult)
+ {
+ return Collections.singleton(writer.finish(openResult));
+ }
+
+ public Collection<SSTableReader> finished()
+ {
+ return Collections.singleton(writer.finished());
+ }
+
+ public SSTableMultiWriter setOpenResult(boolean openResult)
+ {
+ writer.setOpenResult(openResult);
+ return this;
+ }
+
+ public String getFilename()
+ {
+ return writer.getFilename();
+ }
+
+ public long getFilePointer()
+ {
+ return writer.getFilePointer();
+ }
+
+ public UUID getCfId()
+ {
+ return writer.metadata.cfId;
+ }
+
+ public Throwable commit(Throwable accumulate)
+ {
+ return writer.commit(accumulate);
+ }
+
+ public Throwable abort(Throwable accumulate)
+ {
+ return writer.abort(accumulate);
+ }
+
+ public void prepareToCommit()
+ {
+ writer.prepareToCommit();
+ }
+
+ public void close() throws Exception
+ {
+ writer.close();
+ }
+
+ public static SSTableMultiWriter create(Descriptor descriptor,
+ long keyCount,
+ long repairedAt,
+ CFMetaData cfm,
+ MetadataCollector metadataCollector,
+ SerializationHeader header,
+ LifecycleTransaction txn)
+ {
+ SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, cfm, metadataCollector, header, txn);
+ return new SimpleSSTableMultiWriter(writer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java b/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java
index 15230ea..56d6130 100644
--- a/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java
+++ b/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java
@@ -21,8 +21,8 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
public class SSTableAddedNotification implements INotification
{
- public final SSTableReader added;
- public SSTableAddedNotification(SSTableReader added)
+ public final Iterable<SSTableReader> added;
+ public SSTableAddedNotification(Iterable<SSTableReader> added)
{
this.added = added;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index a098786..d4b7283 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -36,12 +36,11 @@ import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.messages.FileMessageHeader;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.BytesReadTracker;
@@ -85,7 +84,7 @@ public class StreamReader
* @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
*/
@SuppressWarnings("resource")
- public SSTableWriter read(ReadableByteChannel channel) throws IOException
+ public SSTableMultiWriter read(ReadableByteChannel channel) throws IOException
{
logger.debug("reading file from {}, repairedAt = {}, level = {}", session.peer, repairedAt, sstableLevel);
long totalSize = totalSize();
@@ -98,7 +97,7 @@ public class StreamReader
}
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, format);
+ SSTableMultiWriter writer = createWriter(cfs, totalSize, repairedAt, format);
DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
BytesReadTracker in = new BytesReadTracker(dis);
@@ -115,7 +114,8 @@ public class StreamReader
}
catch (Throwable e)
{
- writer.abort();
+ SSTableMultiWriter.abortOrDie(writer);
+
drain(dis, in.getBytesRead());
if (e instanceof IOException)
throw (IOException) e;
@@ -124,14 +124,16 @@ public class StreamReader
}
}
- protected SSTableWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, SSTableFormat.Type format) throws IOException
+ protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, SSTableFormat.Type format) throws IOException
{
- Directories.DataDirectory localDir = cfs.directories.getWriteableLocation(totalSize);
+ Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
if (localDir == null)
throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
- desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.directories.getLocationForDisk(localDir), format));
- return SSTableWriter.create(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata), session.getTransaction(cfId));
+ desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format));
+
+
+ return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata), session.getTransaction(cfId));
}
protected void drain(InputStream dis, long bytesRead) throws IOException
@@ -161,7 +163,7 @@ public class StreamReader
return size;
}
- protected void writePartition(StreamDeserializer deserializer, SSTableWriter writer, ColumnFamilyStore cfs) throws IOException
+ protected void writePartition(StreamDeserializer deserializer, SSTableMultiWriter writer, ColumnFamilyStore cfs) throws IOException
{
DecoratedKey key = deserializer.newPartition();
writer.append(deserializer);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 7311069..2bcbbc1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -68,7 +69,7 @@ public class StreamReceiveTask extends StreamTask
private boolean done = false;
// holds references to SSTables received
- protected Collection<SSTableWriter> sstables;
+ protected Collection<SSTableMultiWriter> sstables;
public StreamReceiveTask(StreamSession session, UUID cfId, int totalFiles, long totalSize)
{
@@ -86,12 +87,12 @@ public class StreamReceiveTask extends StreamTask
*
* @param sstable SSTable file received.
*/
- public synchronized void received(SSTableWriter sstable)
+ public synchronized void received(SSTableMultiWriter sstable)
{
if (done)
return;
- assert cfId.equals(sstable.metadata.cfId);
+ assert cfId.equals(sstable.getCfId());
sstables.add(sstable);
if (sstables.size() == totalFiles)
@@ -126,8 +127,8 @@ public class StreamReceiveTask extends StreamTask
if (kscf == null)
{
// schema was dropped during streaming
- for (SSTableWriter writer : task.sstables)
- writer.abort();
+ task.sstables.forEach(SSTableMultiWriter::abortOrDie);
+
task.sstables.clear();
task.txn.abort();
return;
@@ -138,11 +139,11 @@ public class StreamReceiveTask extends StreamTask
try
{
List<SSTableReader> readers = new ArrayList<>();
- for (SSTableWriter writer : task.sstables)
+ for (SSTableMultiWriter writer : task.sstables)
{
- SSTableReader reader = writer.finish(true);
- readers.add(reader);
- task.txn.update(reader, false);
+ Collection<SSTableReader> newReaders = writer.finish(true);
+ readers.addAll(newReaders);
+ task.txn.update(newReaders, false);
}
task.sstables.clear();
@@ -211,8 +212,7 @@ public class StreamReceiveTask extends StreamTask
return;
done = true;
- for (SSTableWriter writer : sstables)
- writer.abort();
+ sstables.forEach(SSTableMultiWriter::abortOrDie);
txn.abort();
sstables.clear();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 205291b..f702e24 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -25,7 +25,7 @@ import java.nio.channels.ReadableByteChannel;
import com.google.common.base.Throwables;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,7 +62,7 @@ public class CompressedStreamReader extends StreamReader
*/
@Override
@SuppressWarnings("resource")
- public SSTableWriter read(ReadableByteChannel channel) throws IOException
+ public SSTableMultiWriter read(ReadableByteChannel channel) throws IOException
{
logger.debug("reading file from {}, repairedAt = {}", session.peer, repairedAt);
long totalSize = totalSize();
@@ -75,7 +75,7 @@ public class CompressedStreamReader extends StreamReader
}
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, format);
+ SSTableMultiWriter writer = createWriter(cfs, totalSize, repairedAt, format);
CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.compressedChecksumType());
BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
@@ -102,7 +102,7 @@ public class CompressedStreamReader extends StreamReader
}
catch (Throwable e)
{
- writer.abort();
+ SSTableMultiWriter.abortOrDie(writer);
drain(cis, in.getBytesRead());
if (e instanceof IOException)
throw (IOException) e;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
index bce9691..19f9e12 100644
--- a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
@@ -17,12 +17,11 @@
*/
package org.apache.cassandra.streaming.messages;
-import java.io.DataInputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
@@ -80,9 +79,9 @@ public class IncomingFileMessage extends StreamMessage
};
public FileMessageHeader header;
- public SSTableWriter sstable;
+ public SSTableMultiWriter sstable;
- public IncomingFileMessage(SSTableWriter sstable, FileMessageHeader header)
+ public IncomingFileMessage(SSTableMultiWriter sstable, FileMessageHeader header)
{
super(Type.FILE);
this.header = header;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java b/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java
index c8587d8..701bbc3 100644
--- a/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java
+++ b/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java
@@ -29,7 +29,6 @@ import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
@@ -69,7 +68,7 @@ public class SSTableExpiredBlockers
Keyspace ks = Keyspace.openWithoutSSTables(keyspace);
ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnfamily);
- Directories.SSTableLister lister = cfs.directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
+ Directories.SSTableLister lister = cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
Set<SSTableReader> sstables = new HashSet<>();
for (Map.Entry<Descriptor, Set<Component>> sstable : lister.list().entrySet())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java b/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java
index cb3cc5c..4f4e904 100644
--- a/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java
+++ b/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java
@@ -76,7 +76,7 @@ public class SSTableLevelResetter
Keyspace keyspace = Keyspace.openWithoutSSTables(keyspaceName);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(columnfamily);
boolean foundSSTable = false;
- for (Map.Entry<Descriptor, Set<Component>> sstable : cfs.directories.sstableLister(Directories.OnTxnErr.THROW).list().entrySet())
+ for (Map.Entry<Descriptor, Set<Component>> sstable : cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).list().entrySet())
{
if (sstable.getValue().contains(Component.STATS))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java b/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
index 95f516a..6554bd0 100644
--- a/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
+++ b/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
@@ -30,17 +30,14 @@ import java.util.Set;
import com.google.common.base.Throwables;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.compaction.LeveledManifest;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.utils.Pair;
/**
* Create a decent leveling for the given keyspace/column family
@@ -95,7 +92,7 @@ public class SSTableOfflineRelevel
Keyspace ks = Keyspace.openWithoutSSTables(keyspace);
ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnfamily);
- Directories.SSTableLister lister = cfs.directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
+ Directories.SSTableLister lister = cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
Set<SSTableReader> sstables = new HashSet<>();
for (Map.Entry<Descriptor, Set<Component>> sstable : lister.list().entrySet())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index f64b8d9..f3a1a35 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -84,7 +84,7 @@ public class StandaloneScrubber
String snapshotName = "pre-scrub-" + System.currentTimeMillis();
OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug);
- Directories.SSTableLister lister = cfs.directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
+ Directories.SSTableLister lister = cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
List<SSTableReader> sstables = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
index 88e34b7..cf94c99 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
@@ -63,7 +63,7 @@ public class StandaloneUpgrader
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(options.cf);
OutputHandler handler = new OutputHandler.SystemOutput(false, options.debug);
- Directories.SSTableLister lister = cfs.directories.sstableLister(Directories.OnTxnErr.THROW);
+ Directories.SSTableLister lister = cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW);
if (options.snapshot != null)
lister.onlyBackups(true).snapshots(options.snapshot);
else
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneVerifier.java b/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
index 0b17e39..3412329 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
@@ -18,10 +18,6 @@
*/
package org.apache.cassandra.tools;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
@@ -35,7 +31,6 @@ import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.OutputHandler;
import org.apache.commons.cli.*;
-import java.io.File;
import java.util.*;
import java.util.concurrent.TimeUnit;
@@ -69,7 +64,7 @@ public class StandaloneVerifier
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(options.cfName);
OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug);
- Directories.SSTableLister lister = cfs.directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
+ Directories.SSTableLister lister = cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
boolean extended = options.extended;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
index 7db978e..d684e11 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
@@ -114,9 +114,9 @@ public class LongCompactionsTest
builder.newRow(String.valueOf(i)).add("val", String.valueOf(i));
rows.put(key, builder.build());
}
- SSTableReader sstable = SSTableUtils.prepare().write(rows);
- sstables.add(sstable);
- store.addSSTable(sstable);
+ Collection<SSTableReader> readers = SSTableUtils.prepare().write(rows);
+ sstables.addAll(readers);
+ store.addSSTables(readers);
}
// give garbage collection a bit of time to catch up
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/MockSchema.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/MockSchema.java b/test/unit/org/apache/cassandra/MockSchema.java
index 249dd8d..6b50e49 100644
--- a/test/unit/org/apache/cassandra/MockSchema.java
+++ b/test/unit/org/apache/cassandra/MockSchema.java
@@ -86,7 +86,7 @@ public class MockSchema
public static SSTableReader sstable(int generation, int size, boolean keepRef, ColumnFamilyStore cfs)
{
- Descriptor descriptor = new Descriptor(cfs.directories.getDirectoryForNewSSTables(),
+ Descriptor descriptor = new Descriptor(cfs.getDirectories().getDirectoryForNewSSTables(),
cfs.keyspace.getName(),
cfs.getColumnFamilyName(),
generation);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 78f9e3e..6840e2b 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -329,7 +329,7 @@ public class ColumnFamilyStoreTest
assertTrue(snapshotDetails.containsKey("ephemeralSnapshot"));
assertTrue(snapshotDetails.containsKey("nonEphemeralSnapshot"));
- ColumnFamilyStore.clearEphemeralSnapshots(cfs.directories);
+ ColumnFamilyStore.clearEphemeralSnapshots(cfs.getDirectories());
snapshotDetails = cfs.getSnapshotDetails();
assertEquals(1, snapshotDetails.size());
@@ -350,7 +350,7 @@ public class ColumnFamilyStoreTest
for (int version = 1; version <= 2; ++version)
{
- Descriptor existing = new Descriptor(cfs.directories.getDirectoryForNewSSTables(), KEYSPACE2, CF_STANDARD1, version);
+ Descriptor existing = new Descriptor(cfs.getDirectories().getDirectoryForNewSSTables(), KEYSPACE2, CF_STANDARD1, version);
Descriptor desc = new Descriptor(Directories.getBackupsDirectory(existing), KEYSPACE2, CF_STANDARD1, version);
for (Component c : new Component[]{ Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.STATS })
assertTrue("Cannot find backed-up file:" + desc.filenameFor(c), new File(desc.filenameFor(c)).exists());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index 8889488..52b2aa8 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -324,12 +324,12 @@ public class ScrubTest
String filename = cfs.getSSTablePath(tempDataDir);
Descriptor desc = Descriptor.fromFilename(filename);
- try (SSTableTxnWriter writer = SSTableTxnWriter.create(desc,
- keys.size(),
- 0L,
- 0,
- SerializationHeader.make(cfs.metadata,
- Collections.emptyList())))
+ try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, desc,
+ keys.size(),
+ 0L,
+ 0,
+ SerializationHeader.make(cfs.metadata,
+ Collections.emptyList())))
{
for (String k : keys)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 6dc5f53..a3167f9 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -53,6 +53,7 @@ import org.apache.cassandra.UpdateBuilder;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -162,10 +163,10 @@ public class AntiCompactionTest
private SSTableReader writeFile(ColumnFamilyStore cfs, int count)
{
- File dir = cfs.directories.getDirectoryForNewSSTables();
+ File dir = cfs.getDirectories().getDirectoryForNewSSTables();
String filename = cfs.getSSTablePath(dir);
- try (SSTableTxnWriter writer = SSTableTxnWriter.create(filename, 0, 0, new SerializationHeader(cfm, cfm.partitionColumns(), EncodingStats.NO_STATS)))
+ try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, filename, 0, 0, new SerializationHeader(cfm, cfm.partitionColumns(), EncodingStats.NO_STATS)))
{
for (int i = 0; i < count; i++)
{
@@ -175,7 +176,10 @@ public class AntiCompactionTest
writer.append(builder.build().unfilteredIterator());
}
- return writer.finish(true);
+ Collection<SSTableReader> sstables = writer.finish(true);
+ assertNotNull(sstables);
+ assertEquals(1, sstables.size());
+ return sstables.iterator().next();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
index 1b94a6b..68936f5 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
@@ -75,7 +75,7 @@ public class CompactionAwareWriterTest extends CQLTester
populate(rowCount);
LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getLiveSSTables(), OperationType.COMPACTION);
long beforeSize = txn.originals().iterator().next().onDiskLength();
- CompactionAwareWriter writer = new DefaultCompactionWriter(cfs, txn, txn.originals());
+ CompactionAwareWriter writer = new DefaultCompactionWriter(cfs, cfs.getDirectories(), txn, txn.originals());
int rows = compact(cfs, txn, writer);
assertEquals(1, cfs.getLiveSSTables().size());
assertEquals(rowCount, rows);
@@ -94,7 +94,7 @@ public class CompactionAwareWriterTest extends CQLTester
LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getLiveSSTables(), OperationType.COMPACTION);
long beforeSize = txn.originals().iterator().next().onDiskLength();
int sstableSize = (int)beforeSize/10;
- CompactionAwareWriter writer = new MaxSSTableSizeWriter(cfs, txn, txn.originals(), sstableSize, 0);
+ CompactionAwareWriter writer = new MaxSSTableSizeWriter(cfs, cfs.getDirectories(), txn, txn.originals(), sstableSize, 0);
int rows = compact(cfs, txn, writer);
assertEquals(10, cfs.getLiveSSTables().size());
assertEquals(rowCount, rows);
@@ -111,7 +111,7 @@ public class CompactionAwareWriterTest extends CQLTester
populate(rowCount);
LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getLiveSSTables(), OperationType.COMPACTION);
long beforeSize = txn.originals().iterator().next().onDiskLength();
- CompactionAwareWriter writer = new SplittingSizeTieredCompactionWriter(cfs, txn, txn.originals(), 0);
+ CompactionAwareWriter writer = new SplittingSizeTieredCompactionWriter(cfs, cfs.getDirectories(), txn, txn.originals(), 0);
int rows = compact(cfs, txn, writer);
long expectedSize = beforeSize / 2;
List<SSTableReader> sortedSSTables = new ArrayList<>(cfs.getLiveSSTables());
@@ -147,7 +147,7 @@ public class CompactionAwareWriterTest extends CQLTester
LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getLiveSSTables(), OperationType.COMPACTION);
long beforeSize = txn.originals().iterator().next().onDiskLength();
int sstableSize = (int)beforeSize/targetSSTableCount;
- CompactionAwareWriter writer = new MajorLeveledCompactionWriter(cfs, txn, txn.originals(), sstableSize);
+ CompactionAwareWriter writer = new MajorLeveledCompactionWriter(cfs, cfs.getDirectories(), txn, txn.originals(), sstableSize);
int rows = compact(cfs, txn, writer);
assertEquals(targetSSTableCount, cfs.getLiveSSTables().size());
int [] levelCounts = new int[5];
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 9d5e5fc..8035ac4 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -346,7 +346,7 @@ public class LeveledCompactionStrategyTest
assertFalse(unrepaired.manifest.generations[2].contains(sstable1));
unrepaired.removeSSTable(sstable2);
- strategy.handleNotification(new SSTableAddedNotification(sstable2), this);
+ strategy.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable2)), this);
assertTrue(unrepaired.manifest.getLevel(1).contains(sstable2));
assertFalse(repaired.manifest.getLevel(1).contains(sstable2));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
index 309e35a..9eff1b1 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
@@ -34,7 +34,6 @@ import junit.framework.Assert;
import org.apache.cassandra.MockSchema;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
@@ -50,7 +49,6 @@ import org.apache.cassandra.io.sstable.SSTableRewriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import static org.junit.Assert.assertEquals;
@@ -134,11 +132,11 @@ public class RealTransactionsTest extends SchemaLoader
{
cfs.truncateBlocking();
- String schema = "CREATE TABLE %s.%s (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))";
- String query = "INSERT INTO %s.%s (key, name, val) VALUES (?, ?, ?)";
+ String schema = "CREATE TABLE \"%s\".\"%s\" (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))";
+ String query = "INSERT INTO \"%s\".\"%s\" (key, name, val) VALUES (?, ?, ?)";
try (CQLSSTableWriter writer = CQLSSTableWriter.builder()
- .inDirectory(cfs.directories.getDirectoryForNewSSTables())
+ .inDirectory(cfs.getDirectories().getDirectoryForNewSSTables())
.forTable(String.format(schema, cfs.keyspace.getName(), cfs.name))
.using(String.format(query, cfs.keyspace.getName(), cfs.name))
.build())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
index 3a943c4..9bcd9e7 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
@@ -30,6 +30,7 @@ import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -177,7 +178,7 @@ public class TrackerTest
Assert.assertTrue(reader.isKeyCacheSetup());
Assert.assertEquals(17 + 121 + 9, cfs.metric.liveDiskSpaceUsed.getCount());
- Assert.assertEquals(3, listener.senders.size());
+ Assert.assertEquals(1, listener.senders.size());
Assert.assertEquals(tracker, listener.senders.get(0));
Assert.assertTrue(listener.received.get(0) instanceof SSTableAddedNotification);
DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
@@ -296,10 +297,10 @@ public class TrackerTest
Assert.assertTrue(tracker.getView().flushingMemtables.contains(prev2));
SSTableReader reader = MockSchema.sstable(0, 10, false, cfs);
- tracker.replaceFlushed(prev2, reader);
+ tracker.replaceFlushed(prev2, Collections.singleton(reader));
Assert.assertEquals(1, tracker.getView().sstables.size());
Assert.assertEquals(1, listener.received.size());
- Assert.assertEquals(reader, ((SSTableAddedNotification) listener.received.get(0)).added);
+ Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(0)).added);
listener.received.clear();
Assert.assertTrue(reader.isKeyCacheSetup());
Assert.assertEquals(10, cfs.metric.liveDiskSpaceUsed.getCount());
@@ -313,12 +314,12 @@ public class TrackerTest
tracker.markFlushing(prev1);
reader = MockSchema.sstable(0, 10, true, cfs);
cfs.invalidate(false);
- tracker.replaceFlushed(prev1, reader);
+ tracker.replaceFlushed(prev1, Collections.singleton(reader));
Assert.assertEquals(0, tracker.getView().sstables.size());
Assert.assertEquals(0, tracker.getView().flushingMemtables.size());
Assert.assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
Assert.assertEquals(3, listener.received.size());
- Assert.assertEquals(reader, ((SSTableAddedNotification) listener.received.get(0)).added);
+ Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(0)).added);
Assert.assertTrue(listener.received.get(1) instanceof SSTableDeletingNotification);
Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(2)).removed.size());
DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
@@ -332,8 +333,8 @@ public class TrackerTest
Tracker tracker = new Tracker(null, false);
MockListener listener = new MockListener(false);
tracker.subscribe(listener);
- tracker.notifyAdded(r1);
- Assert.assertEquals(r1, ((SSTableAddedNotification) listener.received.get(0)).added);
+ tracker.notifyAdded(singleton(r1));
+ Assert.assertEquals(singleton(r1), ((SSTableAddedNotification) listener.received.get(0)).added);
listener.received.clear();
tracker.notifyDeleting(r1);
Assert.assertEquals(r1, ((SSTableDeletingNotification) listener.received.get(0)).deleting);
@@ -353,8 +354,8 @@ public class TrackerTest
MockListener failListener = new MockListener(true);
tracker.subscribe(failListener);
tracker.subscribe(listener);
- Assert.assertNotNull(tracker.notifyAdded(r1, null));
- Assert.assertEquals(r1, ((SSTableAddedNotification) listener.received.get(0)).added);
+ Assert.assertNotNull(tracker.notifyAdded(singleton(r1), null));
+ Assert.assertEquals(singleton(r1), ((SSTableAddedNotification) listener.received.get(0)).added);
listener.received.clear();
Assert.assertNotNull(tracker.notifySSTablesChanged(singleton(r1), singleton(r2), OperationType.COMPACTION, null));
Assert.assertEquals(singleton(r1), ((SSTableListChangedNotification) listener.received.get(0)).removed);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
index 40afa54..523c203 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
@@ -195,7 +195,7 @@ public class ViewTest
Assert.assertEquals(memtable3, cur.getCurrentMemtable());
SSTableReader sstable = MockSchema.sstable(1, cfs);
- cur = View.replaceFlushed(memtable1, sstable).apply(cur);
+ cur = View.replaceFlushed(memtable1, Collections.singleton(sstable)).apply(cur);
Assert.assertEquals(0, cur.flushingMemtables.size());
Assert.assertEquals(1, cur.liveMemtables.size());
Assert.assertEquals(memtable3, cur.getCurrentMemtable());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
index 856ef7c..e1ab48f 100644
--- a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
@@ -64,12 +64,12 @@ public class BigTableWriterTest extends AbstractTransactionalTest
private TestableBTW()
{
- this(cfs.getSSTablePath(cfs.directories.getDirectoryForNewSSTables()));
+ this(cfs.getSSTablePath(cfs.getDirectories().getDirectoryForNewSSTables()));
}
private TestableBTW(String file)
{
- this(file, SSTableTxnWriter.create(file, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)));
+ this(file, SSTableTxnWriter.create(cfs, file, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)));
}
private TestableBTW(String file, SSTableTxnWriter sw)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
index ceeb369..a9165f7 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
@@ -30,6 +30,7 @@ import org.junit.Test;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.io.util.FileUtils;
@@ -44,6 +45,7 @@ public class CQLSSTableWriterClientTest
public void setUp()
{
this.testDirectory = Files.createTempDir();
+ Keyspace.setInitialized();
}
@After
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index d9516cb..1c61f51 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -246,7 +246,7 @@ public class SSTableRewriterTest extends SchemaLoader
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
truncate(cfs);
- File dir = cfs.directories.getDirectoryForNewSSTables();
+ File dir = cfs.getDirectories().getDirectoryForNewSSTables();
LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE, cfs.metadata);
try (SSTableWriter writer = getWriter(cfs, dir, txn))
{
@@ -941,10 +941,10 @@ public class SSTableRewriterTest extends SchemaLoader
Set<SSTableReader> result = new LinkedHashSet<>();
for (int f = 0 ; f < fileCount ; f++)
{
- File dir = cfs.directories.getDirectoryForNewSSTables();
+ File dir = cfs.getDirectories().getDirectoryForNewSSTables();
String filename = cfs.getSSTablePath(dir);
- try (SSTableTxnWriter writer = SSTableTxnWriter.create(filename, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)))
+ try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, filename, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)))
{
int end = f == fileCount - 1 ? partitionCount : ((f + 1) * partitionCount) / fileCount;
for ( ; i < end ; i++)
@@ -955,7 +955,7 @@ public class SSTableRewriterTest extends SchemaLoader
writer.append(builder.build().unfilteredIterator());
}
- result.add(writer.finish(true));
+ result.addAll(writer.finish(true));
}
}
return result;
@@ -972,7 +972,7 @@ public class SSTableRewriterTest extends SchemaLoader
liveDescriptors.add(sstable.descriptor.generation);
spaceUsed += sstable.bytesOnDisk();
}
- for (File dir : cfs.directories.getCFDirectories())
+ for (File dir : cfs.getDirectories().getCFDirectories())
{
for (File f : dir.listFiles())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index 6de5bb9..89c0d61 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -23,6 +23,7 @@ import java.io.File;
import java.io.IOException;
import java.util.*;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
@@ -164,7 +165,7 @@ public class SSTableUtils
return this;
}
- public SSTableReader write(Set<String> keys) throws IOException
+ public Collection<SSTableReader> write(Set<String> keys) throws IOException
{
Map<String, PartitionUpdate> map = new HashMap<>();
for (String key : keys)
@@ -176,7 +177,7 @@ public class SSTableUtils
return write(map);
}
- public SSTableReader write(SortedMap<DecoratedKey, PartitionUpdate> sorted) throws IOException
+ public Collection<SSTableReader> write(SortedMap<DecoratedKey, PartitionUpdate> sorted) throws IOException
{
final Iterator<Map.Entry<DecoratedKey, PartitionUpdate>> iter = sorted.entrySet().iterator();
return write(sorted.size(), new Appender()
@@ -192,7 +193,7 @@ public class SSTableUtils
});
}
- public SSTableReader write(Map<String, PartitionUpdate> entries) throws IOException
+ public Collection<SSTableReader> write(Map<String, PartitionUpdate> entries) throws IOException
{
SortedMap<DecoratedKey, PartitionUpdate> sorted = new TreeMap<>();
for (Map.Entry<String, PartitionUpdate> entry : entries.entrySet())
@@ -201,18 +202,22 @@ public class SSTableUtils
return write(sorted);
}
- public SSTableReader write(int expectedSize, Appender appender) throws IOException
+ public Collection<SSTableReader> write(int expectedSize, Appender appender) throws IOException
{
File datafile = (dest == null) ? tempSSTableFile(ksname, cfname, generation) : new File(dest.filenameFor(Component.DATA));
- SerializationHeader header = SerializationHeader.make(Schema.instance.getCFMetaData(ksname, cfname), Collections.EMPTY_LIST);
- SSTableTxnWriter writer = SSTableTxnWriter.create(datafile.getAbsolutePath(), expectedSize, ActiveRepairService.UNREPAIRED_SSTABLE, 0, header);
+ CFMetaData cfm = Schema.instance.getCFMetaData(ksname, cfname);
+ ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.cfId);
+ SerializationHeader header = SerializationHeader.make(cfm, Collections.EMPTY_LIST);
+ SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, datafile.getAbsolutePath(), expectedSize, ActiveRepairService.UNREPAIRED_SSTABLE, 0, header);
while (appender.append(writer)) { /* pass */ }
- SSTableReader reader = writer.finish(true);
+ Collection<SSTableReader> readers = writer.finish(true);
+
// mark all components for removal
if (cleanup)
- for (Component component : reader.components)
- new File(reader.descriptor.filenameFor(component)).deleteOnExit();
- return reader;
+ for (SSTableReader reader: readers)
+ for (Component component : reader.components)
+ new File(reader.descriptor.filenameFor(component)).deleteOnExit();
+ return readers;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/schema/DefsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/schema/DefsTest.java b/test/unit/org/apache/cassandra/schema/DefsTest.java
index c10865a..c3bcf1a 100644
--- a/test/unit/org/apache/cassandra/schema/DefsTest.java
+++ b/test/unit/org/apache/cassandra/schema/DefsTest.java
@@ -204,7 +204,7 @@ public class DefsTest
ColumnFamilyStore store = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfName);
assertNotNull(store);
store.forceBlockingFlush();
- assertTrue(store.directories.sstableLister(Directories.OnTxnErr.THROW).list().size() > 0);
+ assertTrue(store.getDirectories().sstableLister(Directories.OnTxnErr.THROW).list().size() > 0);
MigrationManager.announceColumnFamilyDrop(ks.name, cfm.cfName);
@@ -226,7 +226,7 @@ public class DefsTest
// verify that the files are gone.
Supplier<Object> lambda = () -> {
- for (File file : store.directories.sstableLister(Directories.OnTxnErr.THROW).listFiles())
+ for (File file : store.getDirectories().sstableLister(Directories.OnTxnErr.THROW).listFiles())
{
if (file.getPath().endsWith("Data.db") && !new File(file.getPath().replace("Data.db", "Compacted")).exists())
return false;
@@ -275,7 +275,7 @@ public class DefsTest
ColumnFamilyStore cfs = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfName);
assertNotNull(cfs);
cfs.forceBlockingFlush();
- assertTrue(!cfs.directories.sstableLister(Directories.OnTxnErr.THROW).list().isEmpty());
+ assertTrue(!cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).list().isEmpty());
MigrationManager.announceKeyspaceDrop(ks.name);
[2/2] cassandra git commit: Give compaction strategies more control
over sstable creation
Posted by ma...@apache.org.
Give compaction strategies more control over sstable creation
Patch by Blake Eggleston; reviewed by marcuse for CASSANDRA-8671
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9ed27277
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9ed27277
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9ed27277
Branch: refs/heads/cassandra-3.0
Commit: 9ed2727739c73d64086d09a86a407a77390f081a
Parents: 0d86645
Author: Blake Eggleston <bd...@gmail.com>
Authored: Thu Aug 6 10:19:55 2015 -0700
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Thu Aug 20 20:47:40 2015 +0200
----------------------------------------------------------------------
.../apache/cassandra/db/ColumnFamilyStore.java | 73 +++++++++---
.../org/apache/cassandra/db/Directories.java | 42 +++++--
src/java/org/apache/cassandra/db/Keyspace.java | 5 +
src/java/org/apache/cassandra/db/Memtable.java | 32 +++--
.../compaction/AbstractCompactionStrategy.java | 24 +++-
.../db/compaction/AbstractCompactionTask.java | 3 +-
.../db/compaction/CompactionManager.java | 6 +-
.../compaction/CompactionStrategyManager.java | 40 +++++--
.../cassandra/db/compaction/CompactionTask.java | 22 ++--
.../db/compaction/LeveledCompactionTask.java | 6 +-
.../db/compaction/SSTableSplitter.java | 3 +-
.../cassandra/db/compaction/Scrubber.java | 3 +-
.../SizeTieredCompactionStrategy.java | 4 +-
.../writers/CompactionAwareWriter.java | 53 ++++++---
.../writers/DefaultCompactionWriter.java | 32 ++---
.../writers/MajorLeveledCompactionWriter.java | 46 ++++----
.../writers/MaxSSTableSizeWriter.java | 45 ++++---
.../SplittingSizeTieredCompactionWriter.java | 52 ++++-----
.../db/lifecycle/LifecycleTransaction.java | 9 ++
.../apache/cassandra/db/lifecycle/Tracker.java | 34 +++---
.../org/apache/cassandra/db/lifecycle/View.java | 4 +-
.../io/sstable/AbstractSSTableSimpleWriter.java | 11 +-
.../io/sstable/SSTableMultiWriter.java | 54 +++++++++
.../cassandra/io/sstable/SSTableTxnWriter.java | 43 +++++--
.../io/sstable/SimpleSSTableMultiWriter.java | 116 +++++++++++++++++++
.../notifications/SSTableAddedNotification.java | 4 +-
.../cassandra/streaming/StreamReader.java | 22 ++--
.../cassandra/streaming/StreamReceiveTask.java | 22 ++--
.../compress/CompressedStreamReader.java | 8 +-
.../streaming/messages/IncomingFileMessage.java | 7 +-
.../cassandra/tools/SSTableExpiredBlockers.java | 3 +-
.../cassandra/tools/SSTableLevelResetter.java | 2 +-
.../cassandra/tools/SSTableOfflineRelevel.java | 5 +-
.../cassandra/tools/StandaloneScrubber.java | 2 +-
.../cassandra/tools/StandaloneUpgrader.java | 2 +-
.../cassandra/tools/StandaloneVerifier.java | 7 +-
.../db/compaction/LongCompactionsTest.java | 6 +-
test/unit/org/apache/cassandra/MockSchema.java | 2 +-
.../cassandra/db/ColumnFamilyStoreTest.java | 4 +-
.../unit/org/apache/cassandra/db/ScrubTest.java | 12 +-
.../db/compaction/AntiCompactionTest.java | 10 +-
.../compaction/CompactionAwareWriterTest.java | 8 +-
.../LeveledCompactionStrategyTest.java | 2 +-
.../db/lifecycle/RealTransactionsTest.java | 8 +-
.../cassandra/db/lifecycle/TrackerTest.java | 19 +--
.../apache/cassandra/db/lifecycle/ViewTest.java | 2 +-
.../io/sstable/BigTableWriterTest.java | 4 +-
.../io/sstable/CQLSSTableWriterClientTest.java | 2 +
.../io/sstable/SSTableRewriterTest.java | 10 +-
.../cassandra/io/sstable/SSTableUtils.java | 25 ++--
.../org/apache/cassandra/schema/DefsTest.java | 6 +-
51 files changed, 651 insertions(+), 315 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index a12de0a..b199c77 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -58,6 +58,7 @@ import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.*;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.TableMetrics.Sampler;
import org.apache.cassandra.metrics.TableMetrics;
@@ -75,6 +76,33 @@ import static org.apache.cassandra.utils.Throwables.maybeFail;
public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
+ // the directories used to load sstables on cfs instantiation
+ private static volatile Directories.DataDirectory[] initialDirectories = Directories.dataDirectories;
+
+ /**
+ * a hook to add additional directories to initialDirectories.
+ * Any additional directories should be added prior to ColumnFamilyStore instantiation on startup
+ */
+ public static synchronized void addInitialDirectories(Directories.DataDirectory[] newDirectories)
+ {
+ assert newDirectories != null;
+
+ Set<Directories.DataDirectory> existing = Sets.newHashSet(initialDirectories);
+
+ List<Directories.DataDirectory> replacementList = Lists.newArrayList(initialDirectories);
+ for (Directories.DataDirectory directory: newDirectories)
+ {
+ if (!existing.contains(directory))
+ {
+ replacementList.add(directory);
+ }
+ }
+
+ Directories.DataDirectory[] replacementArray = new Directories.DataDirectory[replacementList.size()];
+ replacementList.toArray(replacementArray);
+ initialDirectories = replacementArray;
+ }
+
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
@@ -164,7 +192,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
private volatile DefaultInteger maxCompactionThreshold;
private final CompactionStrategyManager compactionStrategyManager;
- public final Directories directories;
+ private volatile Directories directories;
public final TableMetrics metric;
public volatile long sampleLatencyNanos;
@@ -189,6 +217,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
cfs.maxCompactionThreshold = new DefaultInteger(metadata.params.compaction.maxCompactionThreshold());
compactionStrategyManager.maybeReload(metadata);
+ directories = compactionStrategyManager.getDirectories();
scheduleFlush();
@@ -330,6 +359,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
boolean loadSSTables,
boolean registerBookkeeping)
{
+ assert directories != null;
assert metadata != null : "null metadata for " + keyspace + ":" + columnFamilyName;
this.keyspace = keyspace;
@@ -363,6 +393,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// compaction strategy should be created after the CFS has been prepared
this.compactionStrategyManager = new CompactionStrategyManager(this);
+ this.directories = this.compactionStrategyManager.getDirectories();
if (maxCompactionThreshold.value() <= 0 || minCompactionThreshold.value() <=0)
{
@@ -426,6 +457,22 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
}
+ public Directories getDirectories()
+ {
+ return directories;
+ }
+
+ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleTransaction txn)
+ {
+ MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel);
+ return createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn);
+ }
+
+ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector metadataCollector, SerializationHeader header, LifecycleTransaction txn)
+ {
+ return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, metadataCollector, header, txn);
+ }
+
/** call when dropping or renaming a CF. Performs mbean housekeeping and invalidates CFS to other operations */
public void invalidate()
{
@@ -499,7 +546,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
boolean loadSSTables)
{
// get the max generation number, to prevent generation conflicts
- Directories directories = new Directories(metadata);
+ Directories directories = new Directories(metadata, initialDirectories);
Directories.SSTableLister lister = directories.sstableLister(Directories.OnTxnErr.IGNORE).includeBackups(true);
List<Integer> generations = new ArrayList<Integer>();
for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
@@ -633,7 +680,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
currentDescriptors.add(sstable.descriptor);
Set<SSTableReader> newSSTables = new HashSet<>();
- Directories.SSTableLister lister = directories.sstableLister(Directories.OnTxnErr.IGNORE).skipTemporary(true);
+ Directories.SSTableLister lister = getDirectories().sstableLister(Directories.OnTxnErr.IGNORE).skipTemporary(true);
for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
{
Descriptor descriptor = entry.getKey();
@@ -1378,9 +1425,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
maybeFail(data.dropSSTables(Predicates.in(sstables), compactionType, null));
}
- void replaceFlushed(Memtable memtable, SSTableReader sstable)
+ void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
{
- compactionStrategyManager.replaceFlushed(memtable, sstable);
+ compactionStrategyManager.replaceFlushed(memtable, sstables);
}
public boolean isValid()
@@ -1580,7 +1627,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
private void writeSnapshotManifest(final JSONArray filesJSONArr, final String snapshotName)
{
- final File manifestFile = directories.getSnapshotManifestFile(snapshotName);
+ final File manifestFile = getDirectories().getSnapshotManifestFile(snapshotName);
try
{
@@ -1602,7 +1649,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
private void createEphemeralSnapshotMarkerFile(final String snapshot)
{
- final File ephemeralSnapshotMarker = directories.getNewEphemeralSnapshotMarkerFile(snapshot);
+ final File ephemeralSnapshotMarker = getDirectories().getNewEphemeralSnapshotMarkerFile(snapshot);
try
{
@@ -1635,7 +1682,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
Map<Integer, SSTableReader> active = new HashMap<>();
for (SSTableReader sstable : getSSTables(SSTableSet.CANONICAL))
active.put(sstable.descriptor.generation, sstable);
- Map<Descriptor, Set<Component>> snapshots = directories.sstableLister(Directories.OnTxnErr.IGNORE).snapshots(tag).list();
+ Map<Descriptor, Set<Component>> snapshots = getDirectories().sstableLister(Directories.OnTxnErr.IGNORE).snapshots(tag).list();
Refs<SSTableReader> refs = new Refs<>();
try
{
@@ -1692,12 +1739,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public boolean snapshotExists(String snapshotName)
{
- return directories.snapshotExists(snapshotName);
+ return getDirectories().snapshotExists(snapshotName);
}
public long getSnapshotCreationTime(String snapshotName)
{
- return directories.snapshotCreationTime(snapshotName);
+ return getDirectories().snapshotCreationTime(snapshotName);
}
/**
@@ -1708,7 +1755,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
*/
public void clearSnapshot(String snapshotName)
{
- List<File> snapshotDirs = directories.getCFDirectories();
+ List<File> snapshotDirs = getDirectories().getCFDirectories();
Directories.clearSnapshot(snapshotName, snapshotDirs);
}
/**
@@ -1718,7 +1765,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
*/
public Map<String, Pair<Long,Long>> getSnapshotDetails()
{
- return directories.getSnapshotDetails();
+ return getDirectories().getSnapshotDetails();
}
/**
@@ -2251,7 +2298,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public long trueSnapshotsSize()
{
- return directories.trueSnapshotsSize();
+ return getDirectories().trueSnapshotsSize();
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index fa01269..90d2085 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -178,30 +178,36 @@ public class Directories
}
private final CFMetaData metadata;
+ private final DataDirectory[] paths;
private final File[] dataPaths;
+ public Directories(final CFMetaData metadata)
+ {
+ this(metadata, dataDirectories);
+ }
/**
* Create Directories of given ColumnFamily.
* SSTable directories are created under data_directories defined in cassandra.yaml if not exist at this time.
*
* @param metadata metadata of ColumnFamily
*/
- public Directories(final CFMetaData metadata)
+ public Directories(final CFMetaData metadata, DataDirectory[] paths)
{
this.metadata = metadata;
+ this.paths = paths;
String cfId = ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(metadata.cfId));
int idx = metadata.cfName.indexOf(SECONDARY_INDEX_NAME_SEPARATOR);
String cfName = idx >= 0 ? metadata.cfName.substring(0, idx) : metadata.cfName;
String indexNameWithDot = idx >= 0 ? metadata.cfName.substring(idx) : null;
- this.dataPaths = new File[dataDirectories.length];
+ this.dataPaths = new File[paths.length];
// If upgraded from version less than 2.1, use existing directories
String oldSSTableRelativePath = join(metadata.ksName, cfName);
- for (int i = 0; i < dataDirectories.length; ++i)
+ for (int i = 0; i < paths.length; ++i)
{
// check if old SSTable directory exists
- dataPaths[i] = new File(dataDirectories[i].location, oldSSTableRelativePath);
+ dataPaths[i] = new File(paths[i].location, oldSSTableRelativePath);
}
boolean olderDirectoryExists = Iterables.any(Arrays.asList(dataPaths), new Predicate<File>()
{
@@ -214,13 +220,13 @@ public class Directories
{
// use 2.1+ style
String newSSTableRelativePath = join(metadata.ksName, cfName + '-' + cfId);
- for (int i = 0; i < dataDirectories.length; ++i)
- dataPaths[i] = new File(dataDirectories[i].location, newSSTableRelativePath);
+ for (int i = 0; i < paths.length; ++i)
+ dataPaths[i] = new File(paths[i].location, newSSTableRelativePath);
}
// if index, then move to its own directory
if (indexNameWithDot != null)
{
- for (int i = 0; i < dataDirectories.length; ++i)
+ for (int i = 0; i < paths.length; ++i)
dataPaths[i] = new File(dataPaths[i], indexNameWithDot);
}
@@ -327,7 +333,7 @@ public class Directories
// pick directories with enough space and so that resulting sstable dirs aren't blacklisted for writes.
boolean tooBig = false;
- for (DataDirectory dataDir : dataDirectories)
+ for (DataDirectory dataDir : paths)
{
if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir)))
{
@@ -393,7 +399,7 @@ public class Directories
long writeSize = expectedTotalWriteSize / estimatedSSTables;
long totalAvailable = 0L;
- for (DataDirectory dataDir : dataDirectories)
+ for (DataDirectory dataDir : paths)
{
if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir)))
continue;
@@ -481,6 +487,24 @@ public class Directories
{
return location.getUsableSpace();
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ DataDirectory that = (DataDirectory) o;
+
+ return location.equals(that.location);
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return location.hashCode();
+ }
}
static final class DataDirectoryCandidate implements Comparable<DataDirectoryCandidate>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 78b593b..c2613fe 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -210,6 +210,11 @@ public class Keyspace
return cfs;
}
+ public boolean hasColumnFamilyStore(UUID id)
+ {
+ return columnFamilyStores.containsKey(id);
+ }
+
/**
* Take a snapshot of the specific column family, or the entire set of column families
* if columnFamily is null with a given timestamp
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 1b30fc7..4a54666 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -47,7 +47,6 @@ import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.DiskAwareRunnable;
import org.apache.cassandra.service.ActiveRepairService;
@@ -345,22 +344,22 @@ public class Memtable implements Comparable<Memtable>
{
long writeSize = getExpectedWriteSize();
Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize);
- File sstableDirectory = cfs.directories.getLocationForDisk(dataDirectory);
+ File sstableDirectory = cfs.getDirectories().getLocationForDisk(dataDirectory);
assert sstableDirectory != null : "Flush task is not bound to any disk";
- SSTableReader sstable = writeSortedContents(context, sstableDirectory);
- cfs.replaceFlushed(Memtable.this, sstable);
+ Collection<SSTableReader> sstables = writeSortedContents(context, sstableDirectory);
+ cfs.replaceFlushed(Memtable.this, sstables);
}
protected Directories getDirectories()
{
- return cfs.directories;
+ return cfs.getDirectories();
}
- private SSTableReader writeSortedContents(ReplayPosition context, File sstableDirectory)
+ private Collection<SSTableReader> writeSortedContents(ReplayPosition context, File sstableDirectory)
{
logger.info("Writing {}", Memtable.this.toString());
- SSTableReader ssTable;
+ Collection<SSTableReader> ssTables;
try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
{
boolean trackContention = logger.isDebugEnabled();
@@ -397,20 +396,20 @@ public class Memtable implements Comparable<Memtable>
context));
// sstables should contain non-repaired data.
- ssTable = writer.finish(true);
+ ssTables = writer.finish(true);
}
else
{
logger.info("Completed flushing {}; nothing needed to be retained. Commitlog position was {}",
writer.getFilename(), context);
writer.abort();
- ssTable = null;
+ ssTables = null;
}
if (heavilyContendedRowCount > 0)
logger.debug(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
- return ssTable;
+ return ssTables;
}
}
@@ -423,13 +422,12 @@ public class Memtable implements Comparable<Memtable>
LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH, cfs.metadata);
MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context);
return new SSTableTxnWriter(txn,
- SSTableWriter.create(Descriptor.fromFilename(filename),
- (long)partitions.size(),
- ActiveRepairService.UNREPAIRED_SSTABLE,
- cfs.metadata,
- sstableMetadataCollector,
- new SerializationHeader(cfs.metadata, columns, stats),
- txn));
+ cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),
+ (long)partitions.size(),
+ ActiveRepairService.UNREPAIRED_SSTABLE,
+ sstableMetadataCollector,
+ new SerializationHeader(cfs.metadata, columns, stats),
+ txn));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index d9c9ea3..721fd70 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -25,7 +25,12 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.RateLimiter;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.SimpleSSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +43,7 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.utils.JVMStabilityInspector;
/**
@@ -113,6 +119,11 @@ public abstract class AbstractCompactionStrategy
}
}
+ public Directories getDirectories()
+ {
+ return cfs.getDirectories();
+ }
+
/**
* For internal, temporary suspension of background compactions so that we can do exceptional
* things like truncate or major compaction
@@ -222,12 +233,12 @@ public abstract class AbstractCompactionStrategy
* Handle a flushed memtable.
*
* @param memtable the flushed memtable
- * @param sstable the written sstable. can be null if the memtable was clean.
+ * @param sstables the written sstables. can be null or empty if the memtable was clean.
*/
- public void replaceFlushed(Memtable memtable, SSTableReader sstable)
+ public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
{
- cfs.getTracker().replaceFlushed(memtable, sstable);
- if (sstable != null)
+ cfs.getTracker().replaceFlushed(memtable, sstables);
+ if (sstables != null && !sstables.isEmpty())
CompactionManager.instance.submitBackground(cfs);
}
@@ -493,4 +504,9 @@ public abstract class AbstractCompactionStrategy
groupedSSTables.add(currGroup);
return groupedSSTables;
}
+
+ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector meta, SerializationHeader header, LifecycleTransaction txn)
+ {
+ return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfs.metadata, meta, header, txn);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
index 3bf224e..155bf2f 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.compaction;
import java.util.Set;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -63,7 +64,7 @@ public abstract class AbstractCompactionTask extends WrappedRunnable
transaction.close();
}
}
- public abstract CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables);
+ public abstract CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables);
protected abstract int executeInternal(CompactionExecutorStatsCollector collector);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 8aa16d5..66f9ed5 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -594,7 +594,7 @@ public class CompactionManager implements CompactionManagerMBean
}
// group by keyspace/columnfamily
ColumnFamilyStore cfs = Keyspace.open(desc.ksname).getColumnFamilyStore(desc.cfname);
- descriptors.put(cfs, cfs.directories.find(new File(filename.trim()).getName()));
+ descriptors.put(cfs, cfs.getDirectories().find(new File(filename.trim()).getName()));
}
List<Future<?>> futures = new ArrayList<>();
@@ -817,7 +817,7 @@ public class CompactionManager implements CompactionManagerMBean
logger.info("Cleaning up {}", sstable);
- File compactionFileLocation = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(txn.originals(), OperationType.CLEANUP));
+ File compactionFileLocation = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(txn.originals(), OperationType.CLEANUP));
if (compactionFileLocation == null)
throw new IOException("disk full");
@@ -1192,7 +1192,7 @@ public class CompactionManager implements CompactionManagerMBean
logger.info("Anticompacting {}", anticompactionGroup);
Set<SSTableReader> sstableAsSet = anticompactionGroup.originals();
- File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
+ File destination = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
long repairedKeyCount = 0;
long unrepairedKeyCount = 0;
int nowInSec = FBUtilities.nowInSeconds();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index f5097af..47c8de8 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -27,15 +27,21 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.notifications.*;
import org.apache.cassandra.schema.CompactionParams;
+import org.apache.cassandra.service.ActiveRepairService;
/**
* Manages the compaction strategies.
@@ -181,10 +187,10 @@ public class CompactionStrategyManager implements INotificationConsumer
startup();
}
- public void replaceFlushed(Memtable memtable, SSTableReader sstable)
+ public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
{
- cfs.getTracker().replaceFlushed(memtable, sstable);
- if (sstable != null)
+ cfs.getTracker().replaceFlushed(memtable, sstables);
+ if (sstables != null && !sstables.isEmpty())
CompactionManager.instance.submitBackground(cfs);
}
@@ -235,16 +241,24 @@ public class CompactionStrategyManager implements INotificationConsumer
return repaired.shouldDefragment();
}
+ public Directories getDirectories()
+ {
+ assert repaired.getClass().equals(unrepaired.getClass());
+ return repaired.getDirectories();
+ }
public synchronized void handleNotification(INotification notification, Object sender)
{
if (notification instanceof SSTableAddedNotification)
{
SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification;
- if (flushedNotification.added.isRepaired())
- repaired.addSSTable(flushedNotification.added);
- else
- unrepaired.addSSTable(flushedNotification.added);
+ for (SSTableReader sstable : flushedNotification.added)
+ {
+ if (sstable.isRepaired())
+ repaired.addSSTable(sstable);
+ else
+ unrepaired.addSSTable(sstable);
+ }
}
else if (notification instanceof SSTableListChangedNotification)
{
@@ -484,4 +498,16 @@ public class CompactionStrategyManager implements INotificationConsumer
{
return Boolean.parseBoolean(params.options().get(AbstractCompactionStrategy.ONLY_PURGE_REPAIRED_TOMBSTONES));
}
+
+ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector collector, SerializationHeader header, LifecycleTransaction txn)
+ {
+ if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
+ {
+ return unrepaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn);
+ }
+ else
+ {
+ return repaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 0bd6aae..1d96324 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db.compaction;
import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -29,9 +28,9 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
+import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
import org.apache.cassandra.db.compaction.writers.DefaultCompactionWriter;
-import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@ -43,7 +42,6 @@ import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.Refs;
@@ -51,8 +49,8 @@ public class CompactionTask extends AbstractCompactionTask
{
protected static final Logger logger = LoggerFactory.getLogger(CompactionTask.class);
protected final int gcBefore;
- private final boolean offline;
- private final boolean keepOriginals;
+ protected final boolean offline;
+ protected final boolean keepOriginals;
protected static long totalBytesCompacted = 0;
private CompactionExecutorStatsCollector collector;
@@ -154,7 +152,7 @@ public class CompactionTask extends AbstractCompactionTask
{
Set<SSTableReader> actuallyCompact = Sets.difference(transaction.originals(), controller.getFullyExpiredSSTables());
- List<SSTableReader> newSStables;
+ Collection<SSTableReader> newSStables;
long[] mergedRowCounts;
@@ -173,7 +171,7 @@ public class CompactionTask extends AbstractCompactionTask
if (!controller.cfs.getCompactionStrategyManager().isActive)
throw new CompactionInterruptedException(ci.getCompactionInfo());
- try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact))
+ try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, getDirectories(), transaction, actuallyCompact))
{
estimatedKeys = writer.estimatedKeys();
while (ci.hasNext())
@@ -228,10 +226,11 @@ public class CompactionTask extends AbstractCompactionTask
@Override
public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
+ Directories directories,
LifecycleTransaction transaction,
Set<SSTableReader> nonExpiredSSTables)
{
- return new DefaultCompactionWriter(cfs, transaction, nonExpiredSSTables, offline, keepOriginals);
+ return new DefaultCompactionWriter(cfs, directories, transaction, nonExpiredSSTables, offline, keepOriginals);
}
public static String updateCompactionHistory(String keyspaceName, String columnFamilyName, long[] mergedRowCounts, long startSize, long endSize)
@@ -252,6 +251,11 @@ public class CompactionTask extends AbstractCompactionTask
return mergeSummary.toString();
}
+ protected Directories getDirectories()
+ {
+ return cfs.getDirectories();
+ }
+
public static long getMinRepairedAt(Set<SSTableReader> actuallyCompact)
{
long minRepairedAt= Long.MAX_VALUE;
@@ -264,7 +268,7 @@ public class CompactionTask extends AbstractCompactionTask
protected void checkAvailableDiskSpace(long estimatedSSTables, long expectedWriteSize)
{
- while (!cfs.directories.hasAvailableDiskSpace(estimatedSSTables, expectedWriteSize))
+ while (!getDirectories().hasAvailableDiskSpace(estimatedSSTables, expectedWriteSize))
{
if (!reduceScopeForLimitedSpace())
throw new RuntimeException(String.format("Not enough space for compaction, estimated sstables = %d, expected write size = %d", estimatedSSTables, expectedWriteSize));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
index 11d113d..eeb3615 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.compaction;
import java.util.Set;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
import org.apache.cassandra.db.compaction.writers.MajorLeveledCompactionWriter;
import org.apache.cassandra.db.compaction.writers.MaxSSTableSizeWriter;
@@ -42,12 +43,13 @@ public class LeveledCompactionTask extends CompactionTask
@Override
public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
+ Directories directories,
LifecycleTransaction txn,
Set<SSTableReader> nonExpiredSSTables)
{
if (majorCompaction)
- return new MajorLeveledCompactionWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, false, false);
- return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, getLevel(), false, false);
+ return new MajorLeveledCompactionWriter(cfs, directories, txn, nonExpiredSSTables, maxSSTableBytes, false, false);
+ return new MaxSSTableSizeWriter(cfs, directories, txn, nonExpiredSSTables, maxSSTableBytes, getLevel(), false, false);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
index 1944364..3655a37 100644
--- a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
+++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
@@ -75,10 +75,11 @@ public class SSTableSplitter {
@Override
public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
+ Directories directories,
LifecycleTransaction txn,
Set<SSTableReader> nonExpiredSSTables)
{
- return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, sstableSizeInMB * 1024L * 1024L, 0, true, false);
+ return new MaxSSTableSizeWriter(cfs, directories, txn, nonExpiredSSTables, sstableSizeInMB * 1024L * 1024L, 0, true, false);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 747b956..c437832 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -35,7 +35,6 @@ import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.OutputHandler;
import org.apache.cassandra.utils.UUIDGen;
@@ -106,7 +105,7 @@ public class Scrubber implements Closeable
List<SSTableReader> toScrub = Collections.singletonList(sstable);
// Calculate the expected compacted filesize
- this.destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(toScrub, OperationType.SCRUB));
+ this.destination = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(toScrub, OperationType.SCRUB));
if (destination == null)
throw new IOException("disk full");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 2353aa3..05f446c 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
import org.apache.cassandra.db.compaction.writers.SplittingSizeTieredCompactionWriter;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -342,10 +343,11 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
@Override
public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
+ Directories directories,
LifecycleTransaction txn,
Set<SSTableReader> nonExpiredSSTables)
{
- return new SplittingSizeTieredCompactionWriter(cfs, txn, nonExpiredSSTables);
+ return new SplittingSizeTieredCompactionWriter(cfs, directories, txn, nonExpiredSSTables);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index 50e5a96..abc4107 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@ -18,14 +18,17 @@
package org.apache.cassandra.db.compaction.writers;
+import java.util.Collection;
import java.util.List;
import java.util.Set;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.compaction.CompactionTask;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.SSTableRewriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.concurrent.Transactional;
@@ -38,6 +41,7 @@ import org.apache.cassandra.utils.concurrent.Transactional;
public abstract class CompactionAwareWriter extends Transactional.AbstractTransactional implements Transactional
{
protected final ColumnFamilyStore cfs;
+ protected final Directories directories;
protected final Set<SSTableReader> nonExpiredSSTables;
protected final long estimatedTotalKeys;
protected final long maxAge;
@@ -45,35 +49,25 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
protected final LifecycleTransaction txn;
protected final SSTableRewriter sstableWriter;
+ private boolean isInitialized = false;
public CompactionAwareWriter(ColumnFamilyStore cfs,
- LifecycleTransaction txn,
- Set<SSTableReader> nonExpiredSSTables)
- {
- this(cfs, txn, nonExpiredSSTables, false, false);
- }
-
- public CompactionAwareWriter(ColumnFamilyStore cfs,
+ Directories directories,
LifecycleTransaction txn,
Set<SSTableReader> nonExpiredSSTables,
boolean offline,
boolean keepOriginals)
{
this.cfs = cfs;
+ this.directories = directories;
this.nonExpiredSSTables = nonExpiredSSTables;
this.estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
this.maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables);
this.minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables);
this.txn = txn;
this.sstableWriter = new SSTableRewriter(cfs, txn, maxAge, offline).keepOriginals(keepOriginals);
- }
- /**
- * Writes a partition in an implementation specific way
- * @param partition the partition to append
- * @return true if the partition was written, false otherwise
- */
- public abstract boolean append(UnfilteredRowIterator partition);
+ }
@Override
protected Throwable doAbort(Throwable accumulate)
@@ -98,7 +92,7 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
* @return all the written sstables sstables
*/
@Override
- public List<SSTableReader> finish()
+ public Collection<SSTableReader> finish()
{
super.finish();
return sstableWriter.finished();
@@ -112,12 +106,39 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
return estimatedTotalKeys;
}
+ public final boolean append(UnfilteredRowIterator partition)
+ {
+ maybeSwitchWriter(partition.partitionKey());
+ return realAppend(partition);
+ }
+
+ protected abstract boolean realAppend(UnfilteredRowIterator partition);
+
+ /**
+ * Guaranteed to be called before the first call to realAppend.
+ * @param key
+ */
+ protected void maybeSwitchWriter(DecoratedKey key)
+ {
+ if (!isInitialized)
+ switchCompactionLocation(getDirectories().getWriteableLocation(cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType())));
+ isInitialized = true;
+ }
+
+ /**
+ * Implementations of this method should finish the current sstable writer and start writing to this directory.
+ *
+ * Called once before starting to append and then whenever we see a need to start writing to another directory.
+ * @param directory
+ */
+ protected abstract void switchCompactionLocation(Directories.DataDirectory directory);
+
/**
* The directories we can write to
*/
public Directories getDirectories()
{
- return cfs.directories;
+ return directories;
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
index eb55d20..8b90224 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
@@ -18,13 +18,13 @@
package org.apache.cassandra.db.compaction.writers;
-import java.io.File;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -40,20 +40,28 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
{
protected static final Logger logger = LoggerFactory.getLogger(DefaultCompactionWriter.class);
- public DefaultCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
+ public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
{
- this(cfs, txn, nonExpiredSSTables, false, false);
+ this(cfs, directories, txn, nonExpiredSSTables, false, false);
}
@SuppressWarnings("resource")
- public DefaultCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline, boolean keepOriginals)
+ public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline, boolean keepOriginals)
+ {
+ super(cfs, directories, txn, nonExpiredSSTables, offline, keepOriginals);
+ }
+
+ @Override
+ public boolean realAppend(UnfilteredRowIterator partition)
+ {
+ return sstableWriter.append(partition) != null;
+ }
+
+ @Override
+ protected void switchCompactionLocation(Directories.DataDirectory directory)
{
- super(cfs, txn, nonExpiredSSTables, offline, keepOriginals);
- logger.debug("Expected bloom filter size : {}", estimatedTotalKeys);
- long expectedWriteSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType());
- File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
@SuppressWarnings("resource")
- SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
+ SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(directory))),
estimatedTotalKeys,
minRepairedAt,
cfs.metadata,
@@ -64,12 +72,6 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
}
@Override
- public boolean append(UnfilteredRowIterator partition)
- {
- return sstableWriter.append(partition) != null;
- }
-
- @Override
public long estimatedKeys()
{
return estimatedTotalKeys;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index 73ce216..6d191f8 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@ -24,6 +24,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
@@ -47,43 +48,32 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
private int sstablesWritten = 0;
public MajorLeveledCompactionWriter(ColumnFamilyStore cfs,
+ Directories directories,
LifecycleTransaction txn,
Set<SSTableReader> nonExpiredSSTables,
long maxSSTableSize)
{
- this(cfs, txn, nonExpiredSSTables, maxSSTableSize, false, false);
+ this(cfs, directories, txn, nonExpiredSSTables, maxSSTableSize, false, false);
}
@SuppressWarnings("resource")
public MajorLeveledCompactionWriter(ColumnFamilyStore cfs,
+ Directories directories,
LifecycleTransaction txn,
Set<SSTableReader> nonExpiredSSTables,
long maxSSTableSize,
boolean offline,
boolean keepOriginals)
{
- super(cfs, txn, nonExpiredSSTables, offline, keepOriginals);
+ super(cfs, directories, txn, nonExpiredSSTables, offline, keepOriginals);
this.maxSSTableSize = maxSSTableSize;
this.allSSTables = txn.originals();
expectedWriteSize = Math.min(maxSSTableSize, cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType()));
- long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(nonExpiredSSTables) / maxSSTableSize);
- long keysPerSSTable = estimatedTotalKeys / estimatedSSTables;
- File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
-
- @SuppressWarnings("resource")
- SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
- keysPerSSTable,
- minRepairedAt,
- cfs.metadata,
- new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel),
- SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
- txn);
- sstableWriter.switchWriter(writer);
}
@Override
@SuppressWarnings("resource")
- public boolean append(UnfilteredRowIterator partition)
+ public boolean realAppend(UnfilteredRowIterator partition)
{
long posBefore = sstableWriter.currentWriter().getOnDiskFilePointer();
RowIndexEntry rie = sstableWriter.append(partition);
@@ -98,19 +88,25 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
}
averageEstimatedKeysPerSSTable = Math.round(((double) averageEstimatedKeysPerSSTable * sstablesWritten + partitionsWritten) / (sstablesWritten + 1));
- File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
- SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
- averageEstimatedKeysPerSSTable,
- minRepairedAt,
- cfs.metadata,
- new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel),
- SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
- txn);
- sstableWriter.switchWriter(writer);
+ switchCompactionLocation(getWriteDirectory(expectedWriteSize));
partitionsWritten = 0;
sstablesWritten++;
}
return rie != null;
}
+
+ public void switchCompactionLocation(Directories.DataDirectory directory)
+ {
+ File sstableDirectory = getDirectories().getLocationForDisk(directory);
+ @SuppressWarnings("resource")
+ SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
+ averageEstimatedKeysPerSSTable,
+ minRepairedAt,
+ cfs.metadata,
+ new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel),
+ SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+ txn);
+ sstableWriter.switchWriter(writer);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index 241af0d..142fe87 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@ -17,10 +17,10 @@
*/
package org.apache.cassandra.db.compaction.writers;
-import java.io.File;
import java.util.Set;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
@@ -40,16 +40,18 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
private final Set<SSTableReader> allSSTables;
public MaxSSTableSizeWriter(ColumnFamilyStore cfs,
+ Directories directories,
LifecycleTransaction txn,
Set<SSTableReader> nonExpiredSSTables,
long maxSSTableSize,
int level)
{
- this(cfs, txn, nonExpiredSSTables, maxSSTableSize, level, false, false);
+ this(cfs, directories, txn, nonExpiredSSTables, maxSSTableSize, level, false, false);
}
@SuppressWarnings("resource")
public MaxSSTableSizeWriter(ColumnFamilyStore cfs,
+ Directories directories,
LifecycleTransaction txn,
Set<SSTableReader> nonExpiredSSTables,
long maxSSTableSize,
@@ -57,7 +59,7 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
boolean offline,
boolean keepOriginals)
{
- super(cfs, txn, nonExpiredSSTables, offline, keepOriginals);
+ super(cfs, directories, txn, nonExpiredSSTables, offline, keepOriginals);
this.allSSTables = txn.originals();
this.level = level;
this.maxSSTableSize = maxSSTableSize;
@@ -65,37 +67,30 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
expectedWriteSize = Math.min(maxSSTableSize, totalSize);
estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
estimatedSSTables = Math.max(1, estimatedTotalKeys / maxSSTableSize);
- File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
+ }
+
+ @Override
+ public boolean realAppend(UnfilteredRowIterator partition)
+ {
+ RowIndexEntry rie = sstableWriter.append(partition);
+ if (sstableWriter.currentWriter().getOnDiskFilePointer() > maxSSTableSize)
+ switchCompactionLocation(getWriteDirectory(expectedWriteSize));
+ return rie != null;
+ }
+
+ public void switchCompactionLocation(Directories.DataDirectory location)
+ {
@SuppressWarnings("resource")
- SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
+ SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(location))),
estimatedTotalKeys / estimatedSSTables,
minRepairedAt,
cfs.metadata,
new MetadataCollector(allSSTables, cfs.metadata.comparator, level),
SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
txn);
- sstableWriter.switchWriter(writer);
- }
- @Override
- public boolean append(UnfilteredRowIterator partition)
- {
- RowIndexEntry rie = sstableWriter.append(partition);
- if (sstableWriter.currentWriter().getOnDiskFilePointer() > maxSSTableSize)
- {
- File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
- @SuppressWarnings("resource")
- SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
- estimatedTotalKeys / estimatedSSTables,
- minRepairedAt,
- cfs.metadata,
- new MetadataCollector(allSSTables, cfs.metadata.comparator, level),
- SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
- txn);
+ sstableWriter.switchWriter(writer);
- sstableWriter.switchWriter(writer);
- }
- return rie != null;
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index 65924fa..07ca3d0 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@ -25,6 +25,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
@@ -51,15 +52,15 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
private long currentBytesToWrite;
private int currentRatioIndex = 0;
- public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
+ public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
{
- this(cfs, txn, nonExpiredSSTables, DEFAULT_SMALLEST_SSTABLE_BYTES);
+ this(cfs, directories, txn, nonExpiredSSTables, DEFAULT_SMALLEST_SSTABLE_BYTES);
}
@SuppressWarnings("resource")
- public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long smallestSSTable)
+ public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long smallestSSTable)
{
- super(cfs, txn, nonExpiredSSTables, false, false);
+ super(cfs, directories, txn, nonExpiredSSTables, false, false);
this.allSSTables = txn.originals();
totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType());
double[] potentialRatios = new double[20];
@@ -81,43 +82,38 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
}
}
ratios = Arrays.copyOfRange(potentialRatios, 0, noPointIndex);
- File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex])));
long currentPartitionsToWrite = Math.round(estimatedTotalKeys * ratios[currentRatioIndex]);
currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]);
- @SuppressWarnings("resource")
- SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
- currentPartitionsToWrite,
- minRepairedAt,
- cfs.metadata,
- new MetadataCollector(allSSTables, cfs.metadata.comparator, 0),
- SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
- txn);
-
- sstableWriter.switchWriter(writer);
+ switchCompactionLocation(getWriteDirectory(currentBytesToWrite));
logger.debug("Ratios={}, expectedKeys = {}, totalSize = {}, currentPartitionsToWrite = {}, currentBytesToWrite = {}", ratios, estimatedTotalKeys, totalSize, currentPartitionsToWrite, currentBytesToWrite);
}
@Override
- public boolean append(UnfilteredRowIterator partition)
+ public boolean realAppend(UnfilteredRowIterator partition)
{
RowIndexEntry rie = sstableWriter.append(partition);
if (sstableWriter.currentWriter().getOnDiskFilePointer() > currentBytesToWrite && currentRatioIndex < ratios.length - 1) // if we underestimate how many keys we have, the last sstable might get more than we expect
{
currentRatioIndex++;
currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]);
- long currentPartitionsToWrite = Math.round(ratios[currentRatioIndex] * estimatedTotalKeys);
- File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex])));
- @SuppressWarnings("resource")
- SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
- currentPartitionsToWrite,
- minRepairedAt,
- cfs.metadata,
- new MetadataCollector(allSSTables, cfs.metadata.comparator, 0),
- SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
- txn);
- sstableWriter.switchWriter(writer);
- logger.debug("Switching writer, currentPartitionsToWrite = {}", currentPartitionsToWrite);
+ switchCompactionLocation(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex])));
}
return rie != null;
}
+
+ public void switchCompactionLocation(Directories.DataDirectory location)
+ {
+ long currentPartitionsToWrite = Math.round(ratios[currentRatioIndex] * estimatedTotalKeys);
+ @SuppressWarnings("resource")
+ SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(location))),
+ currentPartitionsToWrite,
+ minRepairedAt,
+ cfs.metadata,
+ new MetadataCollector(allSSTables, cfs.metadata.comparator, 0),
+ SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+ txn);
+ logger.debug("Switching writer, currentPartitionsToWrite = {}", currentPartitionsToWrite);
+ sstableWriter.switchWriter(writer);
+
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
index c6cb979..520b229 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
@@ -338,6 +338,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
return accumulate;
}
+
/**
* update a reader: if !original, this is a reader that is being introduced by this transaction;
* otherwise it must be in the originals() set, i.e. a reader guarded by this transaction
@@ -355,6 +356,14 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
reader.setupOnline();
}
+ public void update(Collection<SSTableReader> readers, boolean original)
+ {
+ for(SSTableReader reader: readers)
+ {
+ update(reader, original);
+ }
+ }
+
/**
* mark this reader as for obsoletion : on checkpoint() the reader will be removed from the live set
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index 6f6aca9..d028493 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -186,11 +186,8 @@ public class Tracker
public void addSSTables(Iterable<SSTableReader> sstables)
{
addInitialSSTables(sstables);
- for (SSTableReader sstable : sstables)
- {
- maybeIncrementallyBackup(sstable);
- notifyAdded(sstable);
- }
+ maybeIncrementallyBackup(sstables);
+ notifyAdded(sstables);
}
/** (Re)initializes the tracker, purging all references. */
@@ -330,10 +327,10 @@ public class Tracker
apply(View.markFlushing(memtable));
}
- public void replaceFlushed(Memtable memtable, SSTableReader sstable)
+ public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
{
assert !isDummy();
- if (sstable == null)
+ if (sstables == null || sstables.isEmpty())
{
// sstable may be null if we flushed batchlog and nothing needed to be retained
// if it's null, we don't care what state the cfstore is in, we just replace it and continue
@@ -341,16 +338,16 @@ public class Tracker
return;
}
- sstable.setupOnline();
+ sstables.forEach(SSTableReader::setupOnline);
// back up before creating a new Snapshot (which makes the new one eligible for compaction)
- maybeIncrementallyBackup(sstable);
+ maybeIncrementallyBackup(sstables);
- apply(View.replaceFlushed(memtable, sstable));
+ apply(View.replaceFlushed(memtable, sstables));
Throwable fail;
- fail = updateSizeTracking(emptySet(), singleton(sstable), null);
+ fail = updateSizeTracking(emptySet(), sstables, null);
// TODO: if we're invalidated, should we notifyadded AND removed, or just skip both?
- fail = notifyAdded(sstable, fail);
+ fail = notifyAdded(sstables, fail);
if (!isDummy() && !cfstore.isValid())
dropSSTables();
@@ -377,13 +374,16 @@ public class Tracker
return view.get().getUncompacting(candidates);
}
- public void maybeIncrementallyBackup(final SSTableReader sstable)
+ public void maybeIncrementallyBackup(final Iterable<SSTableReader> sstables)
{
if (!DatabaseDescriptor.isIncrementalBackupsEnabled())
return;
- File backupsDir = Directories.getBackupsDirectory(sstable.descriptor);
- sstable.createLinks(FileUtils.getCanonicalPath(backupsDir));
+ for (SSTableReader sstable : sstables)
+ {
+ File backupsDir = Directories.getBackupsDirectory(sstable.descriptor);
+ sstable.createLinks(FileUtils.getCanonicalPath(backupsDir));
+ }
}
// NOTIFICATION
@@ -405,7 +405,7 @@ public class Tracker
return accumulate;
}
- Throwable notifyAdded(SSTableReader added, Throwable accumulate)
+ Throwable notifyAdded(Iterable<SSTableReader> added, Throwable accumulate)
{
INotification notification = new SSTableAddedNotification(added);
for (INotificationConsumer subscriber : subscribers)
@@ -422,7 +422,7 @@ public class Tracker
return accumulate;
}
- public void notifyAdded(SSTableReader added)
+ public void notifyAdded(Iterable<SSTableReader> added)
{
maybeFail(notifyAdded(added, null));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java b/src/java/org/apache/cassandra/db/lifecycle/View.java
index 7ee0fdf..b62c7e3 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@ -310,7 +310,7 @@ public class View
}
// called after flush: removes memtable from flushingMemtables, and inserts flushed into the live sstable set
- static Function<View, View> replaceFlushed(final Memtable memtable, final SSTableReader flushed)
+ static Function<View, View> replaceFlushed(final Memtable memtable, final Collection<SSTableReader> flushed)
{
return new Function<View, View>()
{
@@ -323,7 +323,7 @@ public class View
return new View(view.liveMemtables, flushingMemtables, view.sstablesMap,
view.compactingMap, view.intervalTree);
- Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), singleton(flushed));
+ Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), flushed);
return new View(view.liveMemtables, flushingMemtables, sstableMap, view.compactingMap,
SSTableIntervalTree.build(sstableMap.keySet()));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index f4b4da8..d94b219 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -60,11 +60,12 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
protected SSTableTxnWriter createWriter()
{
- return SSTableTxnWriter.create(createDescriptor(directory, metadata.ksName, metadata.cfName, formatType),
- 0,
- ActiveRepairService.UNREPAIRED_SSTABLE,
- 0,
- new SerializationHeader(metadata, columns, EncodingStats.NO_STATS));
+ return SSTableTxnWriter.create(metadata,
+ createDescriptor(directory, metadata.ksName, metadata.cfName, formatType),
+ 0,
+ ActiveRepairService.UNREPAIRED_SSTABLE,
+ 0,
+ new SerializationHeader(metadata, columns, EncodingStats.NO_STATS));
}
private static Descriptor createDescriptor(File directory, final String keyspace, final String columnFamily, final SSTableFormat.Type fmt)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java
new file mode 100644
index 0000000..0bb3721
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+public interface SSTableMultiWriter extends Transactional
+{
+
+ /**
+ * Writes a partition in an implementation specific way
+ * @param partition the partition to append
+ * @return true if the partition was written, false otherwise
+ */
+ boolean append(UnfilteredRowIterator partition);
+
+ Collection<SSTableReader> finish(long repairedAt, long maxDataAge, boolean openResult);
+ Collection<SSTableReader> finish(boolean openResult);
+ Collection<SSTableReader> finished();
+
+ SSTableMultiWriter setOpenResult(boolean openResult);
+
+ String getFilename();
+ long getFilePointer();
+ UUID getCfId();
+
+ static void abortOrDie(SSTableMultiWriter writer)
+ {
+ Throwables.maybeFail(writer.abort(null));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
index 42bffb1..6e1ac38 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
@@ -18,13 +18,17 @@
package org.apache.cassandra.io.sstable;
-import org.apache.cassandra.db.RowIndexEntry;
+import java.util.Collection;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.utils.concurrent.Transactional;
/**
@@ -35,15 +39,15 @@ import org.apache.cassandra.utils.concurrent.Transactional;
public class SSTableTxnWriter extends Transactional.AbstractTransactional implements Transactional
{
private final LifecycleTransaction txn;
- private final SSTableWriter writer;
+ private final SSTableMultiWriter writer;
- public SSTableTxnWriter(LifecycleTransaction txn, SSTableWriter writer)
+ public SSTableTxnWriter(LifecycleTransaction txn, SSTableMultiWriter writer)
{
this.txn = txn;
this.writer = writer;
}
- public RowIndexEntry append(UnfilteredRowIterator iterator)
+ public boolean append(UnfilteredRowIterator iterator)
{
return writer.append(iterator);
}
@@ -74,28 +78,43 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem
writer.prepareToCommit();
}
- public SSTableReader finish(boolean openResult)
+ public Collection<SSTableReader> finish(boolean openResult)
{
writer.setOpenResult(openResult);
finish();
return writer.finished();
}
- public static SSTableTxnWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
+ public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
{
LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE, descriptor.directory);
- SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, sstableLevel, header, txn);
+ SSTableMultiWriter writer = cfs.createSSTableMultiWriter(descriptor, keyCount, repairedAt, sstableLevel, header, txn);
+ return new SSTableTxnWriter(txn, writer);
+ }
+
+ public static SSTableTxnWriter create(CFMetaData cfm, Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
+ {
+ if (Keyspace.open(cfm.ksName).hasColumnFamilyStore(cfm.cfId))
+ {
+ ColumnFamilyStore cfs = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfId);
+ return create(cfs, descriptor, keyCount, repairedAt, sstableLevel, header);
+ }
+
+ // if the column family store does not exist, we create a new default SSTableMultiWriter to use:
+ LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE, descriptor.directory);
+ MetadataCollector collector = new MetadataCollector(cfm.comparator).sstableLevel(sstableLevel);
+ SSTableMultiWriter writer = SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfm, collector, header, txn);
return new SSTableTxnWriter(txn, writer);
}
- public static SSTableTxnWriter create(String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
+ public static SSTableTxnWriter create(ColumnFamilyStore cfs, String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
{
Descriptor desc = Descriptor.fromFilename(filename);
- return create(desc, keyCount, repairedAt, sstableLevel, header);
+ return create(cfs, desc, keyCount, repairedAt, sstableLevel, header);
}
- public static SSTableTxnWriter create(String filename, long keyCount, long repairedAt, SerializationHeader header)
+ public static SSTableTxnWriter create(ColumnFamilyStore cfs, String filename, long keyCount, long repairedAt, SerializationHeader header)
{
- return create(filename, keyCount, repairedAt, 0, header);
+ return create(cfs, filename, keyCount, repairedAt, 0, header);
}
}