You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/11/24 10:39:30 UTC

[1/4] cassandra git commit: Make LCS split compaction results over many directories

Repository: cassandra
Updated Branches:
  refs/heads/trunk 528cc3dd1 -> 065aeeb4a


Make LCS split compaction results over many directories

Patch by marcuse; reviewed by yukim for CASSANDRA-8329


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2ce1ad8e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2ce1ad8e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2ce1ad8e

Branch: refs/heads/trunk
Commit: 2ce1ad8e6f5d3c5cf781e1ff87cda4f61c89d9ee
Parents: 025b406
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Nov 18 11:01:17 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Nov 24 09:43:47 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 src/java/org/apache/cassandra/db/Memtable.java  | 20 ++++--
 .../cassandra/db/compaction/CompactionTask.java | 74 ++++++++++++--------
 .../cassandra/io/util/DiskAwareRunnable.java    | 37 +++-------
 4 files changed, 69 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce1ad8e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 01ea887..6a5ac0d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.12:
+ * Make LCS split compaction results over all data directories (CASSANDRA-8329)
  * Fix some failing queries that use multi-column relations
    on COMPACT STORAGE tables (CASSANDRA-8264)
  * Fix InvalidRequestException with ORDER BY (CASSANDRA-8286)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce1ad8e/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 0b186dc..425b352 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -336,13 +336,23 @@ public class Memtable
             return estimatedSize;
         }
 
-        protected void runWith(File sstableDirectory) throws Exception
+        protected void runMayThrow() throws Exception
         {
+            long writeSize = getExpectedWriteSize();
+            Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize);
+            File sstableDirectory = cfs.directories.getLocationForDisk(dataDirectory);
             assert sstableDirectory != null : "Flush task is not bound to any disk";
-
-            SSTableReader sstable = writeSortedContents(context, sstableDirectory);
-            cfs.replaceFlushed(Memtable.this, sstable);
-            latch.countDown();
+            try
+            {
+                SSTableReader sstable = writeSortedContents(context, sstableDirectory);
+                cfs.replaceFlushed(Memtable.this, sstable);
+                latch.countDown();
+            }
+            finally
+            {
+                if (dataDirectory != null)
+                    returnWriteDirectory(dataDirectory, writeSize);
+            }
         }
 
         protected Directories getDirectories()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce1ad8e/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 5ef4aad..08fe81a 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -87,11 +87,11 @@ public class CompactionTask extends AbstractCompactionTask
      * which are properly serialized.
      * Caller is in charge of marking/unmarking the sstables as compacting.
      */
-    protected void runWith(File sstableDirectory) throws Exception
+    protected void runMayThrow() throws Exception
     {
         // The collection of sstables passed may be empty (but not null); even if
         // it is not empty, it may compact down to nothing if all rows are deleted.
-        assert sstables != null && sstableDirectory != null;
+        assert sstables != null;
 
         // Note that the current compaction strategy, is not necessarily the one this task was created under.
         // This should be harmless; see comments to CFS.maybeReloadCompactionStrategy.
@@ -149,45 +149,60 @@ public class CompactionTask extends AbstractCompactionTask
                 return;
             }
 
-            SSTableWriter writer = createCompactionWriter(sstableDirectory, keysPerSSTable);
+            long writeSize = getExpectedWriteSize() / estimatedSSTables;
+            Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize);
+            SSTableWriter writer = createCompactionWriter(cfs.directories.getLocationForDisk(dataDirectory), keysPerSSTable);
             writers.add(writer);
-            while (iter.hasNext())
+            try
             {
-                if (ci.isStopRequested())
-                    throw new CompactionInterruptedException(ci.getCompactionInfo());
-
-                AbstractCompactedRow row = iter.next();
-                RowIndexEntry indexEntry = writer.append(row);
-                if (indexEntry == null)
+                while (iter.hasNext())
                 {
-                    controller.invalidateCachedRow(row.key);
-                    row.close();
-                    continue;
-                }
+                    if (ci.isStopRequested())
+                        throw new CompactionInterruptedException(ci.getCompactionInfo());
 
-                totalkeysWritten++;
+                    AbstractCompactedRow row = iter.next();
+                    RowIndexEntry indexEntry = writer.append(row);
+                    if (indexEntry == null)
+                    {
+                        controller.invalidateCachedRow(row.key);
+                        row.close();
+                        continue;
+                    }
 
-                if (DatabaseDescriptor.getPreheatKeyCache())
-                {
-                    for (SSTableReader sstable : actuallyCompact)
+                    totalkeysWritten++;
+
+                    if (DatabaseDescriptor.getPreheatKeyCache())
                     {
-                        if (sstable.getCachedPosition(row.key, false) != null)
+                        for (SSTableReader sstable : actuallyCompact)
                         {
-                            cachedKeys.put(row.key, indexEntry);
-                            break;
+                            if (sstable.getCachedPosition(row.key, false) != null)
+                            {
+                                cachedKeys.put(row.key, indexEntry);
+                                break;
+                            }
                         }
                     }
-                }
 
-                if (newSSTableSegmentThresholdReached(writer))
-                {
-                    // tmp = false because later we want to query it with descriptor from SSTableReader
-                    cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys);
-                    writer = createCompactionWriter(sstableDirectory, keysPerSSTable);
-                    writers.add(writer);
-                    cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
+                    if (newSSTableSegmentThresholdReached(writer))
+                    {
+                        // tmp = false because later we want to query it with descriptor from SSTableReader
+                        cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys);
+                        returnWriteDirectory(dataDirectory, writeSize);
+                        // make sure we don't try to call returnWriteDirectory in finally {..} if we throw exception in getWriteDirectory() below:
+                        dataDirectory = null;
+                        writeSize = getExpectedWriteSize() / estimatedSSTables;
+                        dataDirectory = getWriteDirectory(writeSize);
+                        writer = createCompactionWriter(cfs.directories.getLocationForDisk(dataDirectory), keysPerSSTable);
+                        writers.add(writer);
+                        cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
+                    }
                 }
             }
+            finally
+            {
+                if (dataDirectory != null)
+                    returnWriteDirectory(dataDirectory, writeSize);
+            }
 
             if (writer.getFilePointer() > 0)
             {
@@ -291,6 +306,7 @@ public class CompactionTask extends AbstractCompactionTask
 
     private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable)
     {
+        assert sstableDirectory != null;
         return new SSTableWriter(cfs.getTempSSTablePath(sstableDirectory),
                                  keysPerSSTable,
                                  cfs.metadata,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce1ad8e/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
index 198a88d..93b06ab 100644
--- a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
+++ b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
@@ -17,23 +17,16 @@
  */
 package org.apache.cassandra.io.util;
 
-import java.io.File;
-
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 public abstract class DiskAwareRunnable extends WrappedRunnable
 {
-    /**
-     * Run this task after selecting the optimal disk for it
-     */
-    protected void runMayThrow() throws Exception
+    protected Directories.DataDirectory getWriteDirectory(long writeSize)
     {
-        long writeSize;
         Directories.DataDirectory directory;
         while (true)
         {
-            writeSize = getExpectedWriteSize();
             directory = getDirectories().getWriteableLocation();
             if (directory != null || !reduceScopeForLimitedSpace())
                 break;
@@ -43,15 +36,13 @@ public abstract class DiskAwareRunnable extends WrappedRunnable
 
         directory.currentTasks.incrementAndGet();
         directory.estimatedWorkingSize.addAndGet(writeSize);
-        try
-        {
-            runWith(getDirectories().getLocationForDisk(directory));
-        }
-        finally
-        {
-            directory.estimatedWorkingSize.addAndGet(-1 * writeSize);
-            directory.currentTasks.decrementAndGet();
-        }
+        return directory;
+    }
+
+    protected void returnWriteDirectory(Directories.DataDirectory directory, long writeSize)
+    {
+        directory.estimatedWorkingSize.addAndGet(-1 * writeSize);
+        directory.currentTasks.decrementAndGet();
     }
 
     /**
@@ -61,18 +52,6 @@ public abstract class DiskAwareRunnable extends WrappedRunnable
     protected abstract Directories getDirectories();
 
     /**
-     * Executes this task on given {@code sstableDirectory}.
-     * @param sstableDirectory sstable directory to work on
-     */
-    protected abstract void runWith(File sstableDirectory) throws Exception;
-
-    /**
-     * Get expected write size to determine which disk to use for this task.
-     * @return expected size in bytes this task will write to disk.
-     */
-    public abstract long getExpectedWriteSize();
-
-    /**
      * Called if no disk is available with free space for the full write size.
      * @return true if the scope of the task was successfully reduced.
      */


[2/4] cassandra git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by ma...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
	CHANGES.txt
	src/java/org/apache/cassandra/db/Memtable.java
	src/java/org/apache/cassandra/db/compaction/CompactionTask.java
	src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0d01c365
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0d01c365
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0d01c365

Branch: refs/heads/trunk
Commit: 0d01c36599a7721a864780a3a10e134fdfa6797a
Parents: f02d194 2ce1ad8
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Nov 24 10:27:50 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Nov 24 10:27:50 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 src/java/org/apache/cassandra/db/Memtable.java  |  6 +++--
 .../cassandra/db/compaction/CompactionTask.java | 10 ++++-----
 .../cassandra/io/util/DiskAwareRunnable.java    | 23 ++------------------
 4 files changed, 12 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d01c365/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 96da1bd,6a5ac0d..313000a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,18 -1,5 +1,19 @@@
 -2.0.12:
 +2.1.3
 + * Fix high size calculations for prepared statements (CASSANDRA-8231)
 + * Centralize shared executors (CASSANDRA-8055)
 + * Fix filtering for CONTAINS (KEY) relations on frozen collection
 +   clustering columns when the query is restricted to a single
 +   partition (CASSANDRA-8203)
 + * Do more aggressive entire-sstable TTL expiry checks (CASSANDRA-8243)
 + * Add more log info if readMeter is null (CASSANDRA-8238)
 + * add check of the system wall clock time at startup (CASSANDRA-8305)
 + * Support for frozen collections (CASSANDRA-7859)
 + * Fix overflow on histogram computation (CASSANDRA-8028)
 + * Have paxos reuse the timestamp generation of normal queries (CASSANDRA-7801)
 + * Fix incremental repair not remove parent session on remote (CASSANDRA-8291)
 + * Improve JBOD disk utilization (CASSANDRA-7386)
 +Merged from 2.0:
+  * Make LCS split compaction results over all data directories (CASSANDRA-8329)
   * Fix some failing queries that use multi-column relations
     on COMPACT STORAGE tables (CASSANDRA-8264)
   * Fix InvalidRequestException with ORDER BY (CASSANDRA-8286)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d01c365/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Memtable.java
index ba3864f,425b352..3ae5da4
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@@ -306,12 -336,23 +306,14 @@@ public class Memtabl
              return estimatedSize;
          }
  
-         protected void runWith(File sstableDirectory) throws Exception
+         protected void runMayThrow() throws Exception
          {
+             long writeSize = getExpectedWriteSize();
+             Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize);
+             File sstableDirectory = cfs.directories.getLocationForDisk(dataDirectory);
              assert sstableDirectory != null : "Flush task is not bound to any disk";
- 
 -            try
 -            {
 -                SSTableReader sstable = writeSortedContents(context, sstableDirectory);
 -                cfs.replaceFlushed(Memtable.this, sstable);
 -                latch.countDown();
 -            }
 -            finally
 -            {
 -                if (dataDirectory != null)
 -                    returnWriteDirectory(dataDirectory, writeSize);
 -            }
 +            SSTableReader sstable = writeSortedContents(context, sstableDirectory);
 +            cfs.replaceFlushed(Memtable.this, sstable);
          }
  
          protected Directories getDirectories()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d01c365/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index b442482,08fe81a..0e8900d
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -106,11 -91,8 +106,11 @@@ public class CompactionTask extends Abs
      {
          // The collection of sstables passed may be empty (but not null); even if
          // it is not empty, it may compact down to nothing if all rows are deleted.
-         assert sstables != null && sstableDirectory != null;
+         assert sstables != null;
  
 +        if (sstables.size() == 0)
 +            return;
 +
          // Note that the current compaction strategy, is not necessarily the one this task was created under.
          // This should be harmless; see comments to CFS.maybeReloadCompactionStrategy.
          AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
@@@ -133,147 -112,206 +133,147 @@@
          // new sstables from flush can be added during a compaction, but only the compaction can remove them,
          // so in our single-threaded compaction world this is a valid way of determining if we're compacting
          // all the sstables (that existed when we started)
 -        logger.info("Compacting {}", toCompact);
 +        logger.info("Compacting {}", sstables);
  
          long start = System.nanoTime();
 -        long totalkeysWritten = 0;
 -
 -        long estimatedTotalKeys = Math.max(cfs.metadata.getIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact, cfs.metadata));
 -        long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
 -        long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
 -        if (logger.isDebugEnabled())
 -            logger.debug("Expected bloom filter size : " + keysPerSSTable);
 -
 -        AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction()
 -                                      ? new ParallelCompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller)
 -                                      : new CompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller);
 -        CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
 -        Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
 -
 -        // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
 -        // replace the old entries.  Track entries to preheat here until then.
 -        Map<Descriptor, Map<DecoratedKey, RowIndexEntry>> cachedKeyMap =  new HashMap<Descriptor, Map<DecoratedKey, RowIndexEntry>>();
 -
 -        Collection<SSTableReader> sstables = new ArrayList<SSTableReader>();
 -        Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>();
 -
 -        if (collector != null)
 -            collector.beginCompaction(ci);
 -        try
 +        long totalKeysWritten = 0;
 +
 +        try (CompactionController controller = getCompactionController(sstables);)
          {
 -            if (!iter.hasNext())
 -            {
 -                // don't mark compacted in the finally block, since if there _is_ nondeleted data,
 -                // we need to sync it (via closeAndOpen) first, so there is no period during which
 -                // a crash could cause data loss.
 -                cfs.markObsolete(toCompact, compactionType);
 -                return;
 -            }
  
 -            long writeSize = getExpectedWriteSize() / estimatedSSTables;
 -            Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize);
 -            SSTableWriter writer = createCompactionWriter(cfs.directories.getLocationForDisk(dataDirectory), keysPerSSTable);
 -            writers.add(writer);
 -            try
 +            Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables());
 +
 +            long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact));
 +            long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
 +            long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
 +            logger.debug("Expected bloom filter size : {}", keysPerSSTable);
 +
 +            try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
              {
 -                while (iter.hasNext())
 +                AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller);
 +                Iterator<AbstractCompactedRow> iter = ci.iterator();
 +                List<SSTableReader> newSStables;
 +                // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
 +                // replace the old entries.  Track entries to preheat here until then.
 +                long minRepairedAt = getMinRepairedAt(actuallyCompact);
 +                // we only need the age of the data that we're actually retaining
 +                long maxAge = getMaxDataAge(actuallyCompact);
 +                if (collector != null)
 +                    collector.beginCompaction(ci);
 +                long lastCheckObsoletion = start;
 +                SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, offline);
 +                try
                  {
 -                    if (ci.isStopRequested())
 -                        throw new CompactionInterruptedException(ci.getCompactionInfo());
 -
 -                    AbstractCompactedRow row = iter.next();
 -                    RowIndexEntry indexEntry = writer.append(row);
 -                    if (indexEntry == null)
 +                    if (!iter.hasNext())
                      {
 -                        controller.invalidateCachedRow(row.key);
 -                        row.close();
 -                        continue;
 +                        // don't mark compacted in the finally block, since if there _is_ nondeleted data,
 +                        // we need to sync it (via closeAndOpen) first, so there is no period during which
 +                        // a crash could cause data loss.
 +                        cfs.markObsolete(sstables, compactionType);
 +                        return;
                      }
  
-                     writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
 -                    totalkeysWritten++;
 -
 -                    if (DatabaseDescriptor.getPreheatKeyCache())
++                    writer.switchWriter(createCompactionWriter(cfs.directories.getLocationForDisk(getWriteDirectory(estimatedTotalKeys/estimatedSSTables)), keysPerSSTable, minRepairedAt));
 +                    while (iter.hasNext())
                      {
 -                        for (SSTableReader sstable : actuallyCompact)
 +                        if (ci.isStopRequested())
 +                            throw new CompactionInterruptedException(ci.getCompactionInfo());
 +
 +                        AbstractCompactedRow row = iter.next();
 +                        if (writer.append(row) != null)
                          {
 -                            if (sstable.getCachedPosition(row.key, false) != null)
 +                            totalKeysWritten++;
 +                            if (newSSTableSegmentThresholdReached(writer.currentWriter()))
                              {
-                                 writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
 -                                cachedKeys.put(row.key, indexEntry);
 -                                break;
++                                writer.switchWriter(createCompactionWriter(cfs.directories.getLocationForDisk(getWriteDirectory(estimatedTotalKeys/estimatedSSTables)), keysPerSSTable, minRepairedAt));
                              }
                          }
 -                    }
  
 -                    if (newSSTableSegmentThresholdReached(writer))
 -                    {
 -                        // tmp = false because later we want to query it with descriptor from SSTableReader
 -                        cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys);
 -                        returnWriteDirectory(dataDirectory, writeSize);
 -                        // make sure we don't try to call returnWriteDirectory in finally {..} if we throw exception in getWriteDirectory() below:
 -                        dataDirectory = null;
 -                        writeSize = getExpectedWriteSize() / estimatedSSTables;
 -                        dataDirectory = getWriteDirectory(writeSize);
 -                        writer = createCompactionWriter(cfs.directories.getLocationForDisk(dataDirectory), keysPerSSTable);
 -                        writers.add(writer);
 -                        cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
 +                        if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L))
 +                        {
 +                            controller.maybeRefreshOverlaps();
 +                            lastCheckObsoletion = System.nanoTime();
 +                        }
                      }
 -                }
 -            }
 -            finally
 -            {
 -                if (dataDirectory != null)
 -                    returnWriteDirectory(dataDirectory, writeSize);
 -            }
 -
 -            if (writer.getFilePointer() > 0)
 -            {
 -                cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys);
 -            }
 -            else
 -            {
 -                writer.abort();
 -                writers.remove(writer);
 -            }
  
 -            long maxAge = getMaxDataAge(toCompact);
 -            for (SSTableWriter completedWriter : writers)
 -                sstables.add(completedWriter.closeAndOpenReader(maxAge));
 -        }
 -        catch (Throwable t)
 -        {
 -            for (SSTableWriter writer : writers)
 -                writer.abort();
 -            // also remove already completed SSTables
 -            for (SSTableReader sstable : sstables)
 -            {
 -                sstable.markObsolete();
 -                sstable.releaseReference();
 -            }
 -            throw Throwables.propagate(t);
 -        }
 -        finally
 -        {
 -            controller.close();
 -
 -            // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
 -            // (in replaceCompactedSSTables)
 -            if (taskId != null)
 -                SystemKeyspace.finishCompaction(taskId);
 -
 -            if (collector != null)
 -                collector.finishCompaction(ci);
 -
 -            try
 -            {
 -                // We don't expect this to throw, but just in case, we do it after the cleanup above, to make sure
 -                // we don't end up with compaction information hanging around indefinitely in limbo.
 -                iter.close();
 -            }
 -            catch (IOException e)
 -            {
 -                throw new RuntimeException(e);
 -            }
 -        }
 -
 -        replaceCompactedSSTables(toCompact, sstables);
 -        // TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up
 -        for (SSTableReader sstable : sstables)
 -        {
 -            if (sstable.acquireReference())
 -            {
 -                try
 +                    // don't replace old sstables yet, as we need to mark the compaction finished in the system table
 +                    newSStables = writer.finish();
 +                }
 +                catch (Throwable t)
                  {
 -                    sstable.preheat(cachedKeyMap.get(sstable.descriptor));
 +                    writer.abort();
 +                    throw t;
                  }
                  finally
                  {
- 
 -                    sstable.releaseReference();
 +                    // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
 +                    // (in replaceCompactedSSTables)
 +                    if (taskId != null)
 +                        SystemKeyspace.finishCompaction(taskId);
 +
 +                    if (collector != null)
 +                        collector.finishCompaction(ci);
                  }
 -            }
 -        }
  
 -        // log a bunch of statistics about the result and save to system table compaction_history
 -        long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
 -        long startsize = SSTable.getTotalBytes(toCompact);
 -        long endsize = SSTable.getTotalBytes(sstables);
 -        double ratio = (double) endsize / (double) startsize;
 -
 -        StringBuilder builder = new StringBuilder();
 -        for (SSTableReader reader : sstables)
 -            builder.append(reader.descriptor.baseFilename()).append(",");
 -
 -        double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
 -        long totalSourceRows = 0;
 -        long[] counts = ci.getMergedRowCounts();
 -        StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
 -        Map<Integer, Long> mergedRows = new HashMap<Integer, Long>();
 -        for (int i = 0; i < counts.length; i++)
 -        {
 -            long count = counts[i];
 -            if (count == 0)
 -                continue;
 -
 -            int rows = i + 1;
 -            totalSourceRows += rows * count;
 -            mergeSummary.append(String.format("%d:%d, ", rows, count));
 -            mergedRows.put(rows, count);
 +                Collection<SSTableReader> oldSStables = this.sstables;
 +                if (!offline)
 +                    cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
 +
 +                // log a bunch of statistics about the result and save to system table compaction_history
 +                long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
 +                long startsize = SSTableReader.getTotalBytes(oldSStables);
 +                long endsize = SSTableReader.getTotalBytes(newSStables);
 +                double ratio = (double) endsize / (double) startsize;
 +
 +                StringBuilder newSSTableNames = new StringBuilder();
 +                for (SSTableReader reader : newSStables)
 +                    newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
 +
 +                double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
 +                long totalSourceRows = 0;
 +                long[] counts = ci.getMergedRowCounts();
 +                StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
 +                Map<Integer, Long> mergedRows = new HashMap<>();
 +                for (int i = 0; i < counts.length; i++)
 +                {
 +                    long count = counts[i];
 +                    if (count == 0)
 +                        continue;
 +
 +                    int rows = i + 1;
 +                    totalSourceRows += rows * count;
 +                    mergeSummary.append(String.format("%d:%d, ", rows, count));
 +                    mergedRows.put(rows, count);
 +                }
 +
 +                SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
 +                logger.info(String.format("Compacted %d sstables to [%s].  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
 +                                          oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
 +                logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
 +                logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
 +            }
          }
 +    }
  
 -        SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
 -        logger.info(String.format("Compacted %d sstables to [%s].  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
 -                                  toCompact.size(), builder.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalkeysWritten, mergeSummary.toString()));
 -        logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
 +    private long getMinRepairedAt(Set<SSTableReader> actuallyCompact)
 +    {
 +        long minRepairedAt= Long.MAX_VALUE;
 +        for (SSTableReader sstable : actuallyCompact)
 +            minRepairedAt = Math.min(minRepairedAt, sstable.getSSTableMetadata().repairedAt);
 +        if (minRepairedAt == Long.MAX_VALUE)
 +            return ActiveRepairService.UNREPAIRED_SSTABLE;
 +        return minRepairedAt;
      }
  
 -    private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable)
 +    private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable, long repairedAt)
      {
+         assert sstableDirectory != null;
          return new SSTableWriter(cfs.getTempSSTablePath(sstableDirectory),
                                   keysPerSSTable,
 +                                 repairedAt,
                                   cfs.metadata,
                                   cfs.partitioner,
 -                                 SSTableMetadata.createCollector(toCompact, cfs.metadata.comparator, getLevel()));
 +                                 new MetadataCollector(sstables, cfs.metadata.comparator, getLevel()));
      }
  
      protected int getLevel()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d01c365/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
index 6d453e5,93b06ab..4188f6e
--- a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
+++ b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
@@@ -32,18 -27,24 +27,16 @@@ public abstract class DiskAwareRunnabl
          Directories.DataDirectory directory;
          while (true)
          {
-             writeSize = getExpectedWriteSize();
 -            directory = getDirectories().getWriteableLocation();
 +            directory = getDirectories().getWriteableLocation(writeSize);
              if (directory != null || !reduceScopeForLimitedSpace())
                  break;
          }
          if (directory == null)
              throw new RuntimeException("Insufficient disk space to write " + writeSize + " bytes");
  
-         runWith(getDirectories().getLocationForDisk(directory));
 -        directory.currentTasks.incrementAndGet();
 -        directory.estimatedWorkingSize.addAndGet(writeSize);
+         return directory;
      }
  
 -    protected void returnWriteDirectory(Directories.DataDirectory directory, long writeSize)
 -    {
 -        directory.estimatedWorkingSize.addAndGet(-1 * writeSize);
 -        directory.currentTasks.decrementAndGet();
 -    }
 -
      /**
       * Get sstable directories for the CF.
       * @return Directories instance for the CF.


[4/4] cassandra git commit: Merge branch 'trunk' of https://git-wip-us.apache.org/repos/asf/cassandra into trunk

Posted by ma...@apache.org.
Merge branch 'trunk' of https://git-wip-us.apache.org/repos/asf/cassandra into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/065aeeb4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/065aeeb4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/065aeeb4

Branch: refs/heads/trunk
Commit: 065aeeb4a9d2aca998f96f817649714badb2cc80
Parents: cd4f729 528cc3d
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Nov 24 10:39:05 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Nov 24 10:39:05 2014 +0100

----------------------------------------------------------------------
 lib/jamm-0.3.0.jar | Bin 21149 -> 21033 bytes
 1 file changed, 0 insertions(+), 0 deletions(-)
----------------------------------------------------------------------



[3/4] cassandra git commit: Merge branch 'cassandra-2.1' into trunk

Posted by ma...@apache.org.
Merge branch 'cassandra-2.1' into trunk

Conflicts:
	src/java/org/apache/cassandra/db/compaction/CompactionTask.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cd4f729e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cd4f729e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cd4f729e

Branch: refs/heads/trunk
Commit: cd4f729e9d2328564a77ac7bc52a392edd3d2b82
Parents: be0b451 0d01c36
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Nov 24 10:37:13 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Nov 24 10:37:13 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 src/java/org/apache/cassandra/db/Memtable.java  |  6 +++--
 .../cassandra/db/compaction/CompactionTask.java |  9 ++++----
 .../cassandra/io/util/DiskAwareRunnable.java    | 23 ++------------------
 4 files changed, 11 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4f729e/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4f729e/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4f729e/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 808626b,0e8900d..1abb4ee
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -181,7 -173,7 +181,7 @@@ public class CompactionTask extends Abs
                          return;
                      }
  
-                     writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt, sstableFormat));
 -                    writer.switchWriter(createCompactionWriter(cfs.directories.getLocationForDisk(getWriteDirectory(estimatedTotalKeys/estimatedSSTables)), keysPerSSTable, minRepairedAt));
++                    writer.switchWriter(createCompactionWriter(cfs.directories.getLocationForDisk(getWriteDirectory(estimatedTotalKeys/estimatedSSTables)), keysPerSSTable, minRepairedAt, sstableFormat));
                      while (iter.hasNext())
                      {
                          if (ci.isStopRequested())
@@@ -193,7 -185,7 +193,7 @@@
                              totalKeysWritten++;
                              if (newSSTableSegmentThresholdReached(writer.currentWriter()))
                              {
-                                 writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt, sstableFormat));
 -                                writer.switchWriter(createCompactionWriter(cfs.directories.getLocationForDisk(getWriteDirectory(estimatedTotalKeys/estimatedSSTables)), keysPerSSTable, minRepairedAt));
++                                writer.switchWriter(createCompactionWriter(cfs.directories.getLocationForDisk(getWriteDirectory(estimatedTotalKeys/estimatedSSTables)), keysPerSSTable, minRepairedAt, sstableFormat));
                              }
                          }