You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/11/24 10:38:08 UTC

cassandra git commit: Make LCS split compaction results over many directories

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 025b4060c -> 2ce1ad8e6


Make LCS split compaction results over many directories

Patch by marcuse; reviewed by yukim for CASSANDRA-8329


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2ce1ad8e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2ce1ad8e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2ce1ad8e

Branch: refs/heads/cassandra-2.0
Commit: 2ce1ad8e6f5d3c5cf781e1ff87cda4f61c89d9ee
Parents: 025b406
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Nov 18 11:01:17 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Nov 24 09:43:47 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 src/java/org/apache/cassandra/db/Memtable.java  | 20 ++++--
 .../cassandra/db/compaction/CompactionTask.java | 74 ++++++++++++--------
 .../cassandra/io/util/DiskAwareRunnable.java    | 37 +++-------
 4 files changed, 69 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce1ad8e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 01ea887..6a5ac0d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.12:
+ * Make LCS split compaction results over all data directories (CASSANDRA-8329)
  * Fix some failing queries that use multi-column relations
    on COMPACT STORAGE tables (CASSANDRA-8264)
  * Fix InvalidRequestException with ORDER BY (CASSANDRA-8286)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce1ad8e/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 0b186dc..425b352 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -336,13 +336,23 @@ public class Memtable
             return estimatedSize;
         }
 
-        protected void runWith(File sstableDirectory) throws Exception
+        protected void runMayThrow() throws Exception
         {
+            long writeSize = getExpectedWriteSize();
+            Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize);
+            File sstableDirectory = cfs.directories.getLocationForDisk(dataDirectory);
             assert sstableDirectory != null : "Flush task is not bound to any disk";
-
-            SSTableReader sstable = writeSortedContents(context, sstableDirectory);
-            cfs.replaceFlushed(Memtable.this, sstable);
-            latch.countDown();
+            try
+            {
+                SSTableReader sstable = writeSortedContents(context, sstableDirectory);
+                cfs.replaceFlushed(Memtable.this, sstable);
+                latch.countDown();
+            }
+            finally
+            {
+                if (dataDirectory != null)
+                    returnWriteDirectory(dataDirectory, writeSize);
+            }
         }
 
         protected Directories getDirectories()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce1ad8e/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 5ef4aad..08fe81a 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -87,11 +87,11 @@ public class CompactionTask extends AbstractCompactionTask
      * which are properly serialized.
      * Caller is in charge of marking/unmarking the sstables as compacting.
      */
-    protected void runWith(File sstableDirectory) throws Exception
+    protected void runMayThrow() throws Exception
     {
         // The collection of sstables passed may be empty (but not null); even if
         // it is not empty, it may compact down to nothing if all rows are deleted.
-        assert sstables != null && sstableDirectory != null;
+        assert sstables != null;
 
         // Note that the current compaction strategy, is not necessarily the one this task was created under.
         // This should be harmless; see comments to CFS.maybeReloadCompactionStrategy.
@@ -149,45 +149,60 @@ public class CompactionTask extends AbstractCompactionTask
                 return;
             }
 
-            SSTableWriter writer = createCompactionWriter(sstableDirectory, keysPerSSTable);
+            long writeSize = getExpectedWriteSize() / estimatedSSTables;
+            Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize);
+            SSTableWriter writer = createCompactionWriter(cfs.directories.getLocationForDisk(dataDirectory), keysPerSSTable);
             writers.add(writer);
-            while (iter.hasNext())
+            try
             {
-                if (ci.isStopRequested())
-                    throw new CompactionInterruptedException(ci.getCompactionInfo());
-
-                AbstractCompactedRow row = iter.next();
-                RowIndexEntry indexEntry = writer.append(row);
-                if (indexEntry == null)
+                while (iter.hasNext())
                 {
-                    controller.invalidateCachedRow(row.key);
-                    row.close();
-                    continue;
-                }
+                    if (ci.isStopRequested())
+                        throw new CompactionInterruptedException(ci.getCompactionInfo());
 
-                totalkeysWritten++;
+                    AbstractCompactedRow row = iter.next();
+                    RowIndexEntry indexEntry = writer.append(row);
+                    if (indexEntry == null)
+                    {
+                        controller.invalidateCachedRow(row.key);
+                        row.close();
+                        continue;
+                    }
 
-                if (DatabaseDescriptor.getPreheatKeyCache())
-                {
-                    for (SSTableReader sstable : actuallyCompact)
+                    totalkeysWritten++;
+
+                    if (DatabaseDescriptor.getPreheatKeyCache())
                     {
-                        if (sstable.getCachedPosition(row.key, false) != null)
+                        for (SSTableReader sstable : actuallyCompact)
                         {
-                            cachedKeys.put(row.key, indexEntry);
-                            break;
+                            if (sstable.getCachedPosition(row.key, false) != null)
+                            {
+                                cachedKeys.put(row.key, indexEntry);
+                                break;
+                            }
                         }
                     }
-                }
 
-                if (newSSTableSegmentThresholdReached(writer))
-                {
-                    // tmp = false because later we want to query it with descriptor from SSTableReader
-                    cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys);
-                    writer = createCompactionWriter(sstableDirectory, keysPerSSTable);
-                    writers.add(writer);
-                    cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
+                    if (newSSTableSegmentThresholdReached(writer))
+                    {
+                        // tmp = false because later we want to query it with descriptor from SSTableReader
+                        cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys);
+                        returnWriteDirectory(dataDirectory, writeSize);
+                        // make sure we don't try to call returnWriteDirectory in finally {..} if we throw exception in getWriteDirectory() below:
+                        dataDirectory = null;
+                        writeSize = getExpectedWriteSize() / estimatedSSTables;
+                        dataDirectory = getWriteDirectory(writeSize);
+                        writer = createCompactionWriter(cfs.directories.getLocationForDisk(dataDirectory), keysPerSSTable);
+                        writers.add(writer);
+                        cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
+                    }
                 }
             }
+            finally
+            {
+                if (dataDirectory != null)
+                    returnWriteDirectory(dataDirectory, writeSize);
+            }
 
             if (writer.getFilePointer() > 0)
             {
@@ -291,6 +306,7 @@ public class CompactionTask extends AbstractCompactionTask
 
     private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable)
     {
+        assert sstableDirectory != null;
         return new SSTableWriter(cfs.getTempSSTablePath(sstableDirectory),
                                  keysPerSSTable,
                                  cfs.metadata,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce1ad8e/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
index 198a88d..93b06ab 100644
--- a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
+++ b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
@@ -17,23 +17,16 @@
  */
 package org.apache.cassandra.io.util;
 
-import java.io.File;
-
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 public abstract class DiskAwareRunnable extends WrappedRunnable
 {
-    /**
-     * Run this task after selecting the optimal disk for it
-     */
-    protected void runMayThrow() throws Exception
+    protected Directories.DataDirectory getWriteDirectory(long writeSize)
     {
-        long writeSize;
         Directories.DataDirectory directory;
         while (true)
         {
-            writeSize = getExpectedWriteSize();
             directory = getDirectories().getWriteableLocation();
             if (directory != null || !reduceScopeForLimitedSpace())
                 break;
@@ -43,15 +36,13 @@ public abstract class DiskAwareRunnable extends WrappedRunnable
 
         directory.currentTasks.incrementAndGet();
         directory.estimatedWorkingSize.addAndGet(writeSize);
-        try
-        {
-            runWith(getDirectories().getLocationForDisk(directory));
-        }
-        finally
-        {
-            directory.estimatedWorkingSize.addAndGet(-1 * writeSize);
-            directory.currentTasks.decrementAndGet();
-        }
+        return directory;
+    }
+
+    protected void returnWriteDirectory(Directories.DataDirectory directory, long writeSize)
+    {
+        directory.estimatedWorkingSize.addAndGet(-1 * writeSize);
+        directory.currentTasks.decrementAndGet();
     }
 
     /**
@@ -61,18 +52,6 @@ public abstract class DiskAwareRunnable extends WrappedRunnable
     protected abstract Directories getDirectories();
 
     /**
-     * Executes this task on given {@code sstableDirectory}.
-     * @param sstableDirectory sstable directory to work on
-     */
-    protected abstract void runWith(File sstableDirectory) throws Exception;
-
-    /**
-     * Get expected write size to determine which disk to use for this task.
-     * @return expected size in bytes this task will write to disk.
-     */
-    public abstract long getExpectedWriteSize();
-
-    /**
      * Called if no disk is available with free space for the full write size.
      * @return true if the scope of the task was successfully reduced.
      */