You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2014/02/15 17:21:06 UTC
[2/3] git commit: Add flush directory distinct from compaction
directories patch by jbellis and aleksey for CASSANDRA-6357
Add flush directory distinct from compaction directories
patch by jbellis and aleksey for CASSANDRA-6357
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f10148f4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f10148f4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f10148f4
Branch: refs/heads/trunk
Commit: f10148f4f1719f99bc3888b93f5a285c8af997d1
Parents: fc503dd
Author: Jonathan Ellis <jb...@apache.org>
Authored: Sat Feb 15 10:11:28 2014 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Sat Feb 15 10:11:28 2014 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 15 +-
.../org/apache/cassandra/config/Config.java | 6 +-
.../cassandra/config/DatabaseDescriptor.java | 19 ++-
.../org/apache/cassandra/db/Directories.java | 158 +++++++++++--------
src/java/org/apache/cassandra/db/Memtable.java | 5 +
.../db/compaction/AbstractCompactionTask.java | 5 +
.../db/compaction/CompactionManager.java | 4 +-
.../cassandra/db/compaction/Scrubber.java | 2 +-
.../cassandra/io/util/DiskAwareRunnable.java | 4 +-
.../cassandra/streaming/StreamReader.java | 3 +-
.../cassandra/streaming/StreamReceiveTask.java | 3 +-
.../cassandra/db/ColumnFamilyStoreTest.java | 8 +-
.../apache/cassandra/db/DirectoriesTest.java | 8 +-
.../io/sstable/SSTableSimpleWriterTest.java | 2 +-
15 files changed, 150 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f10148f4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5a3d7a9..abfd680 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1
+ * Add flush directory distinct from compaction directories (CASSANDRA-6357)
* Require JNA by default (CASSANDRA-6575)
* add listsnapshots command to nodetool (CASSANDRA-5742)
* Introduce AtomicBTreeColumns (CASSANDRA-6271)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f10148f4/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 8ddd5e0..8538920 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -109,9 +109,15 @@ partitioner: org.apache.cassandra.dht.Murmur3Partitioner
data_file_directories:
- /var/lib/cassandra/data
-# commit log
+# commit log. when running on magnetic HDD, this should be a
+# separate spindle than the data directories.
commitlog_directory: /var/lib/cassandra/commitlog
+# location to write flushing sstables to. Ideally, this will also be
+# a separate spindle in HDD deployments. If you only have two spindles,
+# have it share with the data spindle.
+flush_directory: /var/lib/cassandra/flush
+
# policy for data disk failures:
# stop: shut down gossip and Thrift, leaving the node effectively dead, but
# can still be inspected via JMX.
@@ -296,10 +302,9 @@ memtable_cleanup_threshold: 0.4
# This sets the amount of memtable flush writer threads. These will
# be blocked by disk io, and each one will hold a memtable in memory
-# while blocked. If you have a large heap and many data directories,
-# you can increase this value for better flush performance.
-# By default this will be set to the amount of data directories defined.
-#memtable_flush_writers: 1
+# while blocked. If your flush directory is backed by SSD, you may
+# want to increase this; by default it will be set to 2.
+#memtable_flush_writers: 2
# A fixed memory pool size in MB for for SSTable index summaries. If left
# empty, this will default to 5% of the heap size. If the memory usage of
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f10148f4/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 2eb9b18..1f47171 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -77,7 +77,10 @@ public class Config
@Deprecated
public Integer concurrent_replicates = null;
- public Integer memtable_flush_writers = null; // will get set to the length of data dirs in DatabaseDescriptor
+ // we don't want a lot of contention, but we also don't want to starve all other tables
+ // if a big one flushes. OS buffering should be able to minimize contention with 2 threads.
+ public int memtable_flush_writers = 2;
+
public Integer memtable_total_space_in_mb;
public float memtable_cleanup_threshold = 0.4f;
@@ -123,6 +126,7 @@ public class Config
public volatile Integer stream_throughput_outbound_megabits_per_sec = 200;
public String[] data_file_directories;
+ public String flush_directory;
public String saved_caches_directory;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f10148f4/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 378fa8a..29dece6 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -264,15 +264,8 @@ public class DatabaseDescriptor
throw new ConfigurationException("memtable_heap_space_in_mb must be positive");
logger.info("Global memtable heap threshold is enabled at {}MB", conf.memtable_total_space_in_mb);
- /* Memtable flush writer threads */
- if (conf.memtable_flush_writers != null && conf.memtable_flush_writers < 1)
- {
+ if (conf.memtable_flush_writers < 1)
throw new ConfigurationException("memtable_flush_writers must be at least 1");
- }
- else if (conf.memtable_flush_writers == null)
- {
- conf.memtable_flush_writers = conf.data_file_directories.length;
- }
/* Local IP or hostname to bind services to */
if (conf.listen_address != null)
@@ -404,6 +397,9 @@ public class DatabaseDescriptor
/* data file and commit log directories. they get created later, when they're needed. */
if (conf.commitlog_directory != null && conf.data_file_directories != null && conf.saved_caches_directory != null)
{
+ if (conf.flush_directory == null)
+ conf.flush_directory = conf.data_file_directories[0];
+
for (String datadir : conf.data_file_directories)
{
if (datadir.equals(conf.commitlog_directory))
@@ -613,6 +609,8 @@ public class DatabaseDescriptor
throw new ConfigurationException("saved_caches_directory must be specified");
FileUtils.createDirectory(conf.saved_caches_directory);
+
+ FileUtils.createDirectory(conf.flush_directory);
}
catch (ConfigurationException e)
{
@@ -1404,4 +1402,9 @@ public class DatabaseDescriptor
String arch = System.getProperty("os.arch");
return arch.contains("64") || arch.contains("sparcv9");
}
+
+ public static String getFlushLocation()
+ {
+ return conf.flush_directory;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f10148f4/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index 6cd9da1..a6b7efa 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -17,8 +17,6 @@
*/
package org.apache.cassandra.db;
-import static com.google.common.collect.Sets.newHashSet;
-
import java.io.File;
import java.io.FileFilter;
import java.io.IOError;
@@ -33,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@@ -40,20 +39,26 @@ import com.google.common.collect.ImmutableSet.Builder;
import com.google.common.collect.Iterables;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.Uninterruptibles;
-
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSError;
import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableDeletingTask;
import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
+import static com.google.common.collect.Sets.newHashSet;
+
/**
* Encapsulate handling of paths to the data files.
*
@@ -86,13 +91,15 @@ public class Directories
public static final String SNAPSHOT_SUBDIR = "snapshots";
public static final String SECONDARY_INDEX_NAME_SEPARATOR = ".";
- public static final DataDirectory[] dataFileLocations;
+ public static final DataDirectory[] dataDirectories;
+ public static final DataDirectory flushDirectory;
static
{
String[] locations = DatabaseDescriptor.getAllDataFileLocations();
- dataFileLocations = new DataDirectory[locations.length];
+ dataDirectories = new DataDirectory[locations.length];
for (int i = 0; i < locations.length; ++i)
- dataFileLocations[i] = new DataDirectory(new File(locations[i]));
+ dataDirectories[i] = new DataDirectory(new File(locations[i]));
+ flushDirectory = new DataDirectory(new File(DatabaseDescriptor.getFlushLocation()));
}
@@ -170,7 +177,8 @@ public class Directories
}
private final CFMetaData metadata;
- private final File[] sstableDirectories;
+ private final File[] dataPaths;
+ private final File flushPath;
/**
* Create Directories of given ColumnFamily.
@@ -181,57 +189,77 @@ public class Directories
public Directories(CFMetaData metadata)
{
this.metadata = metadata;
- this.sstableDirectories = new File[dataFileLocations.length];
+ if (StorageService.instance.isClientMode())
+ {
+ dataPaths = null;
+ flushPath = null;
+ return;
+ }
+
+ String cfId = ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(metadata.cfId));
+ int idx = metadata.cfName.indexOf(SECONDARY_INDEX_NAME_SEPARATOR);
+ // secondary indicies go in the same directory as the base cf
+ String directoryName = idx > 0 ? metadata.cfName.substring(0, idx) + "-" + cfId : metadata.cfName + "-" + cfId;
- // Determine SSTable directories
- // If upgraded from version less than 2.1, use directories already exist.
- for (int i = 0; i < dataFileLocations.length; ++i)
+ this.dataPaths = new File[dataDirectories.length];
+ // If upgraded from version less than 2.1, use existing directories
+ for (int i = 0; i < dataDirectories.length; ++i)
{
// check if old SSTable directory exists
- sstableDirectories[i] = new File(dataFileLocations[i].location, join(metadata.ksName, metadata.cfName));
+ dataPaths[i] = new File(dataDirectories[i].location, join(metadata.ksName, this.metadata.cfName));
}
- boolean olderDirectoryExists = Iterables.any(Arrays.asList(sstableDirectories), new Predicate<File>()
+ boolean olderDirectoryExists = Iterables.any(Arrays.asList(dataPaths), new Predicate<File>()
{
public boolean apply(File file)
{
return file.exists();
}
});
- if (olderDirectoryExists)
- return;
-
- // create directory name
- String directoryName;
- String cfId = ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(metadata.cfId));
- int idx = metadata.cfName.indexOf(SECONDARY_INDEX_NAME_SEPARATOR);
- if (idx > 0)
- // secondary index, goes in the same directory than the base cf
- directoryName = metadata.cfName.substring(0, idx) + "-" + cfId;
- else
- directoryName = metadata.cfName + "-" + cfId;
+ if (!olderDirectoryExists)
+ {
+ // use 2.1-style path names
+ for (int i = 0; i < dataDirectories.length; ++i)
+ dataPaths[i] = new File(dataDirectories[i].location, join(metadata.ksName, directoryName));
+ }
- for (int i = 0; i < dataFileLocations.length; ++i)
- sstableDirectories[i] = new File(dataFileLocations[i].location, join(metadata.ksName, directoryName));
+ flushPath = new File(flushDirectory.location, join(metadata.ksName, directoryName));
- if (!StorageService.instance.isClientMode())
+ for (File dir : allSSTablePaths())
{
- for (File dir : sstableDirectories)
+ try
{
- try
- {
- FileUtils.createDirectory(dir);
- }
- catch (FSError e)
- {
- // don't just let the default exception handler do this, we need the create loop to continue
- logger.error("Failed to create {} directory", dir);
- FileUtils.handleFSError(e);
- }
+ FileUtils.createDirectory(dir);
+ }
+ catch (FSError e)
+ {
+ // don't just let the default exception handler do this, we need the create loop to continue
+ logger.error("Failed to create {} directory", dir);
+ FileUtils.handleFSError(e);
}
}
}
/**
+ * @return an iterable of all possible sstable paths, including flush and post-compaction locations.
+ * Guaranteed to only return one copy of each path, even if there is no dedicated flush location and
+ * it shares with the others.
+ */
+ private Iterable<File> allSSTablePaths()
+ {
+ return ImmutableSet.<File>builder().add(dataPaths).add(flushPath).build();
+ }
+
+ /**
+ * @return an iterable of all possible sstable directories, including flush and post-compaction locations.
+ * Guaranteed to only return one copy of each directories, even if there is no dedicated flush location and
+ * it shares with the others.
+ */
+ private static Iterable<DataDirectory> allSSTableDirectories()
+ {
+ return ImmutableSet.<DataDirectory>builder().add(dataDirectories).add(flushDirectory).build();
+ }
+
+ /**
* Returns SSTable location which is inside given data directory.
*
* @param dataDirectory
@@ -239,7 +267,7 @@ public class Directories
*/
public File getLocationForDisk(DataDirectory dataDirectory)
{
- for (File dir : sstableDirectories)
+ for (File dir : allSSTablePaths())
{
if (dir.getAbsolutePath().startsWith(dataDirectory.location.getAbsolutePath()))
return dir;
@@ -249,7 +277,7 @@ public class Directories
public Descriptor find(String filename)
{
- for (File dir : sstableDirectories)
+ for (File dir : allSSTablePaths())
{
if (new File(dir, filename).exists())
return Descriptor.fromFilename(dir, filename).left;
@@ -257,9 +285,9 @@ public class Directories
return null;
}
- public File getDirectoryForNewSSTables()
+ public File getDirectoryForCompactedSSTables()
{
- File path = getWriteableLocationAsFile();
+ File path = getCompactionLocationAsFile();
// Requesting GC has a chance to free space only if we're using mmap and a non SUN jvm
if (path == null
@@ -272,15 +300,15 @@ public class Directories
// Note: GCInspector will do this already, but only sun JVM supports GCInspector so far
SSTableDeletingTask.rescheduleFailedTasks();
Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
- path = getWriteableLocationAsFile();
+ path = getCompactionLocationAsFile();
}
return path;
}
- public File getWriteableLocationAsFile()
+ public File getCompactionLocationAsFile()
{
- return getLocationForDisk(getWriteableLocation());
+ return getLocationForDisk(getCompactionLocation());
}
/**
@@ -288,12 +316,12 @@ public class Directories
*
* @throws IOError if all directories are blacklisted.
*/
- public DataDirectory getWriteableLocation()
+ public DataDirectory getCompactionLocation()
{
List<DataDirectory> candidates = new ArrayList<>();
// pick directories with enough space and so that resulting sstable dirs aren't blacklisted for writes.
- for (DataDirectory dataDir : dataFileLocations)
+ for (DataDirectory dataDir : dataDirectories)
{
if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir)))
continue;
@@ -318,6 +346,12 @@ public class Directories
return candidates.get(0);
}
+ public DataDirectory getFlushLocation()
+ {
+ return BlacklistedDirectories.isUnwritable(flushPath)
+ ? getCompactionLocation()
+ : flushDirectory;
+ }
public static File getSnapshotDirectory(Descriptor desc, String snapshotName)
{
@@ -430,7 +464,7 @@ public class Directories
if (filtered)
return;
- for (File location : sstableDirectories)
+ for (File location : allSSTablePaths())
{
if (BlacklistedDirectories.isUnreadable(location))
continue;
@@ -492,7 +526,7 @@ public class Directories
public Map<String, Pair<Long, Long>> getSnapshotDetails()
{
final Map<String, Pair<Long, Long>> snapshotSpaceMap = new HashMap<>();
- for (final File dir : sstableDirectories)
+ for (final File dir : allSSTablePaths())
{
final File snapshotDir = new File(dir,SNAPSHOT_SUBDIR);
if (snapshotDir.exists() && snapshotDir.isDirectory())
@@ -522,7 +556,7 @@ public class Directories
}
public boolean snapshotExists(String snapshotName)
{
- for (File dir : sstableDirectories)
+ for (File dir : allSSTablePaths())
{
File snapshotDir = new File(dir, join(SNAPSHOT_SUBDIR, snapshotName));
if (snapshotDir.exists())
@@ -550,7 +584,7 @@ public class Directories
// The snapshot must exist
public long snapshotCreationTime(String snapshotName)
{
- for (File dir : sstableDirectories)
+ for (File dir : allSSTablePaths())
{
File snapshotDir = new File(dir, join(SNAPSHOT_SUBDIR, snapshotName));
if (snapshotDir.exists())
@@ -562,7 +596,7 @@ public class Directories
public long trueSnapshotsSize()
{
long result = 0L;
- for (File dir : sstableDirectories)
+ for (File dir : allSSTablePaths())
result += getTrueAllocatedSizeIn(new File(dir, join(SNAPSHOT_SUBDIR)));
return result;
}
@@ -594,7 +628,7 @@ public class Directories
public static List<File> getKSChildDirectories(String ksName)
{
List<File> result = new ArrayList<>();
- for (DataDirectory dataDirectory : dataFileLocations)
+ for (DataDirectory dataDirectory : allSSTableDirectories())
{
File ksDir = new File(dataDirectory.location, ksName);
File[] cfDirs = ksDir.listFiles();
@@ -612,7 +646,7 @@ public class Directories
public List<File> getCFDirectories()
{
List<File> result = new ArrayList<>();
- for (File dataDirectory : sstableDirectories)
+ for (File dataDirectory : allSSTablePaths())
{
if (dataDirectory.isDirectory())
result.add(dataDirectory);
@@ -640,19 +674,19 @@ public class Directories
return StringUtils.join(s, File.separator);
}
- // Hack for tests, don't use otherwise
+ @VisibleForTesting
static void overrideDataDirectoriesForTest(String loc)
{
- for (int i = 0; i < dataFileLocations.length; ++i)
- dataFileLocations[i] = new DataDirectory(new File(loc));
+ for (int i = 0; i < dataDirectories.length; ++i)
+ dataDirectories[i] = new DataDirectory(new File(loc));
}
- // Hack for tests, don't use otherwise
+ @VisibleForTesting
static void resetDataDirectoriesAfterTest()
{
String[] locations = DatabaseDescriptor.getAllDataFileLocations();
for (int i = 0; i < locations.length; ++i)
- dataFileLocations[i] = new DataDirectory(new File(locations[i]));
+ dataDirectories[i] = new DataDirectory(new File(locations[i]));
}
private class TrueFilesSizeVisitor extends SimpleFileVisitor<Path>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f10148f4/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 9e76e6f..246639e 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -304,6 +304,11 @@ public class Memtable
* 1.2); // bloom filter and row index overhead
}
+ protected Directories.DataDirectory getWriteableLocation()
+ {
+ return cfs.directories.getFlushLocation();
+ }
+
public long getExpectedWriteSize()
{
return estimatedSize;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f10148f4/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
index a4c101f..f1fcbd1 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
@@ -49,6 +49,11 @@ public abstract class AbstractCompactionTask extends DiskAwareRunnable
assert compacting.contains(sstable) : sstable.getFilename() + " is not correctly marked compacting";
}
+ protected Directories.DataDirectory getWriteableLocation()
+ {
+ return cfs.directories.getCompactionLocation();
+ }
+
/**
* executes the task and unmarks sstables compacting
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f10148f4/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 6c09efb..bfffc8f 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -626,7 +626,7 @@ public class CompactionManager implements CompactionManagerMBean
logger.info("Cleaning up {}", sstable);
- File compactionFileLocation = cfs.directories.getDirectoryForNewSSTables();
+ File compactionFileLocation = cfs.directories.getDirectoryForCompactedSSTables();
if (compactionFileLocation == null)
throw new IOException("disk full");
@@ -919,7 +919,7 @@ public class CompactionManager implements CompactionManagerMBean
}
logger.info("Anticompacting {}", sstable);
- File destination = cfs.directories.getDirectoryForNewSSTables();
+ File destination = cfs.directories.getDirectoryForCompactedSSTables();
SSTableWriter repairedSSTableWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable);
SSTableWriter unRepairedSSTableWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f10148f4/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index e1da811..3a71136 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -78,7 +78,7 @@ public class Scrubber implements Closeable
this.skipCorrupted = skipCorrupted;
// Calculate the expected compacted filesize
- this.destination = cfs.directories.getDirectoryForNewSSTables();
+ this.destination = cfs.directories.getDirectoryForCompactedSSTables();
if (destination == null)
throw new IOException("disk full");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f10148f4/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
index 198a88d..f0e2756 100644
--- a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
+++ b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
@@ -34,7 +34,7 @@ public abstract class DiskAwareRunnable extends WrappedRunnable
while (true)
{
writeSize = getExpectedWriteSize();
- directory = getDirectories().getWriteableLocation();
+ directory = getWriteableLocation();
if (directory != null || !reduceScopeForLimitedSpace())
break;
}
@@ -54,6 +54,8 @@ public abstract class DiskAwareRunnable extends WrappedRunnable
}
}
+ protected abstract Directories.DataDirectory getWriteableLocation();
+
/**
* Get sstable directories for the CF.
* @return Directories instance for the CF.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f10148f4/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index d805bf3..c26a61b 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -40,7 +40,6 @@ import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableWriter;
-import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.messages.FileMessageHeader;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -111,7 +110,7 @@ public class StreamReader
protected SSTableWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt) throws IOException
{
- Directories.DataDirectory localDir = cfs.directories.getWriteableLocation();
+ Directories.DataDirectory localDir = cfs.directories.getCompactionLocation();
if (localDir == null)
throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir)));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f10148f4/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index b4d5392..e5ef691 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -27,7 +27,6 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableWriter;
-import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.Pair;
@@ -98,7 +97,7 @@ public class StreamReceiveTask extends StreamTask
Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- StreamLockfile lockfile = new StreamLockfile(cfs.directories.getWriteableLocationAsFile(), UUID.randomUUID());
+ StreamLockfile lockfile = new StreamLockfile(cfs.directories.getCompactionLocationAsFile(), UUID.randomUUID());
lockfile.create(task.sstables);
List<SSTableReader> readers = new ArrayList<>();
for (SSTableWriter writer : task.sstables)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f10148f4/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 5c71d9b..200925c 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -870,7 +870,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
for (int version = 1; version <= 2; ++version)
{
- Descriptor existing = new Descriptor(cfs.directories.getDirectoryForNewSSTables(), "Keyspace2", "Standard1", version, false);
+ Descriptor existing = new Descriptor(cfs.directories.getDirectoryForCompactedSSTables(), "Keyspace2", "Standard1", version, false);
Descriptor desc = new Descriptor(Directories.getBackupsDirectory(existing), "Keyspace2", "Standard1", version, false);
for (Component c : new Component[]{ Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.STATS })
assertTrue("can not find backedup file:" + desc.filenameFor(c), new File(desc.filenameFor(c)).exists());
@@ -1576,7 +1576,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
ByteBuffer key = bytes("key");
// 1st sstable
- SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(), cfmeta, StorageService.getPartitioner());
+ SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForCompactedSSTables(), cfmeta, StorageService.getPartitioner());
writer.newRow(key);
writer.addColumn(bytes("col"), bytes("val"), 1);
writer.close();
@@ -1588,7 +1588,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
final SSTableReader sstable1 = SSTableReader.open(sstableToOpen.getKey());
// simulate incomplete compaction
- writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(),
+ writer = new SSTableSimpleWriter(dir.getDirectoryForCompactedSSTables(),
cfmeta, StorageService.getPartitioner())
{
protected SSTableWriter getWriter()
@@ -1643,7 +1643,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
// Write SSTable generation 3 that has ancestors 1 and 2
final Set<Integer> ancestors = Sets.newHashSet(1, 2);
- SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(),
+ SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForCompactedSSTables(),
cfmeta, StorageService.getPartitioner())
{
protected SSTableWriter getWriter()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f10148f4/test/unit/org/apache/cassandra/db/DirectoriesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
index a9cd2a3..c831b29 100644
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@ -116,7 +116,7 @@ public class DirectoriesTest
for (CFMetaData cfm : CFM)
{
Directories directories = new Directories(cfm);
- assertEquals(cfDir(cfm), directories.getDirectoryForNewSSTables());
+ assertEquals(cfDir(cfm), directories.getDirectoryForCompactedSSTables());
Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1, false);
File snapshotDir = new File(cfDir(cfm), File.separator + Directories.SNAPSHOT_SUBDIR + File.separator + "42");
@@ -183,7 +183,7 @@ public class DirectoriesTest
{
DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.best_effort);
- for (DataDirectory dd : Directories.dataFileLocations)
+ for (DataDirectory dd : Directories.dataDirectories)
{
dd.location.setExecutable(false);
dd.location.setWritable(false);
@@ -199,7 +199,7 @@ public class DirectoriesTest
}
finally
{
- for (DataDirectory dd : Directories.dataFileLocations)
+ for (DataDirectory dd : Directories.dataDirectories)
{
dd.location.setExecutable(true);
dd.location.setWritable(true);
@@ -215,7 +215,7 @@ public class DirectoriesTest
for (final CFMetaData cfm : CFM)
{
final Directories directories = new Directories(cfm);
- assertEquals(cfDir(cfm), directories.getDirectoryForNewSSTables());
+ assertEquals(cfDir(cfm), directories.getDirectoryForCompactedSSTables());
final String n = Long.toString(System.nanoTime());
Callable<File> directoryGetter = new Callable<File>() {
public File call() throws Exception {
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f10148f4/test/unit/org/apache/cassandra/io/sstable/SSTableSimpleWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableSimpleWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableSimpleWriterTest.java
index 15980a4..5995fe3 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableSimpleWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableSimpleWriterTest.java
@@ -44,7 +44,7 @@ public class SSTableSimpleWriterTest extends SchemaLoader
String cfname = "StandardInteger1";
Keyspace t = Keyspace.open(keyspaceName); // make sure we create the directory
- File dir = new Directories(Schema.instance.getCFMetaData(keyspaceName, cfname)).getDirectoryForNewSSTables();
+ File dir = new Directories(Schema.instance.getCFMetaData(keyspaceName, cfname)).getDirectoryForCompactedSSTables();
assert dir.exists();
IPartitioner partitioner = StorageService.getPartitioner();