You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/11/24 10:38:08 UTC
cassandra git commit: Make LCS split compaction results over many
directories
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.0 025b4060c -> 2ce1ad8e6
Make LCS split compaction results over many directories
Patch by marcuse; reviewed by yukim for CASSANDRA-8329
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2ce1ad8e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2ce1ad8e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2ce1ad8e
Branch: refs/heads/cassandra-2.0
Commit: 2ce1ad8e6f5d3c5cf781e1ff87cda4f61c89d9ee
Parents: 025b406
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Nov 18 11:01:17 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Nov 24 09:43:47 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/Memtable.java | 20 ++++--
.../cassandra/db/compaction/CompactionTask.java | 74 ++++++++++++--------
.../cassandra/io/util/DiskAwareRunnable.java | 37 +++-------
4 files changed, 69 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce1ad8e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 01ea887..6a5ac0d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.12:
+ * Make LCS split compaction results over all data directories (CASSANDRA-8329)
* Fix some failing queries that use multi-column relations
on COMPACT STORAGE tables (CASSANDRA-8264)
* Fix InvalidRequestException with ORDER BY (CASSANDRA-8286)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce1ad8e/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 0b186dc..425b352 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -336,13 +336,23 @@ public class Memtable
return estimatedSize;
}
- protected void runWith(File sstableDirectory) throws Exception
+ protected void runMayThrow() throws Exception
{
+ long writeSize = getExpectedWriteSize();
+ Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize);
+ File sstableDirectory = cfs.directories.getLocationForDisk(dataDirectory);
assert sstableDirectory != null : "Flush task is not bound to any disk";
-
- SSTableReader sstable = writeSortedContents(context, sstableDirectory);
- cfs.replaceFlushed(Memtable.this, sstable);
- latch.countDown();
+ try
+ {
+ SSTableReader sstable = writeSortedContents(context, sstableDirectory);
+ cfs.replaceFlushed(Memtable.this, sstable);
+ latch.countDown();
+ }
+ finally
+ {
+ if (dataDirectory != null)
+ returnWriteDirectory(dataDirectory, writeSize);
+ }
}
protected Directories getDirectories()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce1ad8e/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 5ef4aad..08fe81a 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -87,11 +87,11 @@ public class CompactionTask extends AbstractCompactionTask
* which are properly serialized.
* Caller is in charge of marking/unmarking the sstables as compacting.
*/
- protected void runWith(File sstableDirectory) throws Exception
+ protected void runMayThrow() throws Exception
{
// The collection of sstables passed may be empty (but not null); even if
// it is not empty, it may compact down to nothing if all rows are deleted.
- assert sstables != null && sstableDirectory != null;
+ assert sstables != null;
// Note that the current compaction strategy, is not necessarily the one this task was created under.
// This should be harmless; see comments to CFS.maybeReloadCompactionStrategy.
@@ -149,45 +149,60 @@ public class CompactionTask extends AbstractCompactionTask
return;
}
- SSTableWriter writer = createCompactionWriter(sstableDirectory, keysPerSSTable);
+ long writeSize = getExpectedWriteSize() / estimatedSSTables;
+ Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize);
+ SSTableWriter writer = createCompactionWriter(cfs.directories.getLocationForDisk(dataDirectory), keysPerSSTable);
writers.add(writer);
- while (iter.hasNext())
+ try
{
- if (ci.isStopRequested())
- throw new CompactionInterruptedException(ci.getCompactionInfo());
-
- AbstractCompactedRow row = iter.next();
- RowIndexEntry indexEntry = writer.append(row);
- if (indexEntry == null)
+ while (iter.hasNext())
{
- controller.invalidateCachedRow(row.key);
- row.close();
- continue;
- }
+ if (ci.isStopRequested())
+ throw new CompactionInterruptedException(ci.getCompactionInfo());
- totalkeysWritten++;
+ AbstractCompactedRow row = iter.next();
+ RowIndexEntry indexEntry = writer.append(row);
+ if (indexEntry == null)
+ {
+ controller.invalidateCachedRow(row.key);
+ row.close();
+ continue;
+ }
- if (DatabaseDescriptor.getPreheatKeyCache())
- {
- for (SSTableReader sstable : actuallyCompact)
+ totalkeysWritten++;
+
+ if (DatabaseDescriptor.getPreheatKeyCache())
{
- if (sstable.getCachedPosition(row.key, false) != null)
+ for (SSTableReader sstable : actuallyCompact)
{
- cachedKeys.put(row.key, indexEntry);
- break;
+ if (sstable.getCachedPosition(row.key, false) != null)
+ {
+ cachedKeys.put(row.key, indexEntry);
+ break;
+ }
}
}
- }
- if (newSSTableSegmentThresholdReached(writer))
- {
- // tmp = false because later we want to query it with descriptor from SSTableReader
- cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys);
- writer = createCompactionWriter(sstableDirectory, keysPerSSTable);
- writers.add(writer);
- cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
+ if (newSSTableSegmentThresholdReached(writer))
+ {
+ // tmp = false because later we want to query it with descriptor from SSTableReader
+ cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys);
+ returnWriteDirectory(dataDirectory, writeSize);
+ // make sure we don't try to call returnWriteDirectory in finally {..} if we throw exception in getWriteDirectory() below:
+ dataDirectory = null;
+ writeSize = getExpectedWriteSize() / estimatedSSTables;
+ dataDirectory = getWriteDirectory(writeSize);
+ writer = createCompactionWriter(cfs.directories.getLocationForDisk(dataDirectory), keysPerSSTable);
+ writers.add(writer);
+ cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
+ }
}
}
+ finally
+ {
+ if (dataDirectory != null)
+ returnWriteDirectory(dataDirectory, writeSize);
+ }
if (writer.getFilePointer() > 0)
{
@@ -291,6 +306,7 @@ public class CompactionTask extends AbstractCompactionTask
private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable)
{
+ assert sstableDirectory != null;
return new SSTableWriter(cfs.getTempSSTablePath(sstableDirectory),
keysPerSSTable,
cfs.metadata,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce1ad8e/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
index 198a88d..93b06ab 100644
--- a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
+++ b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
@@ -17,23 +17,16 @@
*/
package org.apache.cassandra.io.util;
-import java.io.File;
-
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.utils.WrappedRunnable;
public abstract class DiskAwareRunnable extends WrappedRunnable
{
- /**
- * Run this task after selecting the optimal disk for it
- */
- protected void runMayThrow() throws Exception
+ protected Directories.DataDirectory getWriteDirectory(long writeSize)
{
- long writeSize;
Directories.DataDirectory directory;
while (true)
{
- writeSize = getExpectedWriteSize();
directory = getDirectories().getWriteableLocation();
if (directory != null || !reduceScopeForLimitedSpace())
break;
@@ -43,15 +36,13 @@ public abstract class DiskAwareRunnable extends WrappedRunnable
directory.currentTasks.incrementAndGet();
directory.estimatedWorkingSize.addAndGet(writeSize);
- try
- {
- runWith(getDirectories().getLocationForDisk(directory));
- }
- finally
- {
- directory.estimatedWorkingSize.addAndGet(-1 * writeSize);
- directory.currentTasks.decrementAndGet();
- }
+ return directory;
+ }
+
+ protected void returnWriteDirectory(Directories.DataDirectory directory, long writeSize)
+ {
+ directory.estimatedWorkingSize.addAndGet(-1 * writeSize);
+ directory.currentTasks.decrementAndGet();
}
/**
@@ -61,18 +52,6 @@ public abstract class DiskAwareRunnable extends WrappedRunnable
protected abstract Directories getDirectories();
/**
- * Executes this task on given {@code sstableDirectory}.
- * @param sstableDirectory sstable directory to work on
- */
- protected abstract void runWith(File sstableDirectory) throws Exception;
-
- /**
- * Get expected write size to determine which disk to use for this task.
- * @return expected size in bytes this task will write to disk.
- */
- public abstract long getExpectedWriteSize();
-
- /**
* Called if no disk is available with free space for the full write size.
* @return true if the scope of the task was successfully reduced.
*/