You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/11/24 10:39:30 UTC
[1/4] cassandra git commit: Make LCS split compaction results over
many directories
Repository: cassandra
Updated Branches:
refs/heads/trunk 528cc3dd1 -> 065aeeb4a
Make LCS split compaction results over many directories
Patch by marcuse; reviewed by yukim for CASSANDRA-8329
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2ce1ad8e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2ce1ad8e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2ce1ad8e
Branch: refs/heads/trunk
Commit: 2ce1ad8e6f5d3c5cf781e1ff87cda4f61c89d9ee
Parents: 025b406
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Nov 18 11:01:17 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Nov 24 09:43:47 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/Memtable.java | 20 ++++--
.../cassandra/db/compaction/CompactionTask.java | 74 ++++++++++++--------
.../cassandra/io/util/DiskAwareRunnable.java | 37 +++-------
4 files changed, 69 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce1ad8e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 01ea887..6a5ac0d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.12:
+ * Make LCS split compaction results over all data directories (CASSANDRA-8329)
* Fix some failing queries that use multi-column relations
on COMPACT STORAGE tables (CASSANDRA-8264)
* Fix InvalidRequestException with ORDER BY (CASSANDRA-8286)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce1ad8e/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 0b186dc..425b352 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -336,13 +336,23 @@ public class Memtable
return estimatedSize;
}
- protected void runWith(File sstableDirectory) throws Exception
+ protected void runMayThrow() throws Exception
{
+ long writeSize = getExpectedWriteSize();
+ Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize);
+ File sstableDirectory = cfs.directories.getLocationForDisk(dataDirectory);
assert sstableDirectory != null : "Flush task is not bound to any disk";
-
- SSTableReader sstable = writeSortedContents(context, sstableDirectory);
- cfs.replaceFlushed(Memtable.this, sstable);
- latch.countDown();
+ try
+ {
+ SSTableReader sstable = writeSortedContents(context, sstableDirectory);
+ cfs.replaceFlushed(Memtable.this, sstable);
+ latch.countDown();
+ }
+ finally
+ {
+ if (dataDirectory != null)
+ returnWriteDirectory(dataDirectory, writeSize);
+ }
}
protected Directories getDirectories()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce1ad8e/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 5ef4aad..08fe81a 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -87,11 +87,11 @@ public class CompactionTask extends AbstractCompactionTask
* which are properly serialized.
* Caller is in charge of marking/unmarking the sstables as compacting.
*/
- protected void runWith(File sstableDirectory) throws Exception
+ protected void runMayThrow() throws Exception
{
// The collection of sstables passed may be empty (but not null); even if
// it is not empty, it may compact down to nothing if all rows are deleted.
- assert sstables != null && sstableDirectory != null;
+ assert sstables != null;
// Note that the current compaction strategy, is not necessarily the one this task was created under.
// This should be harmless; see comments to CFS.maybeReloadCompactionStrategy.
@@ -149,45 +149,60 @@ public class CompactionTask extends AbstractCompactionTask
return;
}
- SSTableWriter writer = createCompactionWriter(sstableDirectory, keysPerSSTable);
+ long writeSize = getExpectedWriteSize() / estimatedSSTables;
+ Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize);
+ SSTableWriter writer = createCompactionWriter(cfs.directories.getLocationForDisk(dataDirectory), keysPerSSTable);
writers.add(writer);
- while (iter.hasNext())
+ try
{
- if (ci.isStopRequested())
- throw new CompactionInterruptedException(ci.getCompactionInfo());
-
- AbstractCompactedRow row = iter.next();
- RowIndexEntry indexEntry = writer.append(row);
- if (indexEntry == null)
+ while (iter.hasNext())
{
- controller.invalidateCachedRow(row.key);
- row.close();
- continue;
- }
+ if (ci.isStopRequested())
+ throw new CompactionInterruptedException(ci.getCompactionInfo());
- totalkeysWritten++;
+ AbstractCompactedRow row = iter.next();
+ RowIndexEntry indexEntry = writer.append(row);
+ if (indexEntry == null)
+ {
+ controller.invalidateCachedRow(row.key);
+ row.close();
+ continue;
+ }
- if (DatabaseDescriptor.getPreheatKeyCache())
- {
- for (SSTableReader sstable : actuallyCompact)
+ totalkeysWritten++;
+
+ if (DatabaseDescriptor.getPreheatKeyCache())
{
- if (sstable.getCachedPosition(row.key, false) != null)
+ for (SSTableReader sstable : actuallyCompact)
{
- cachedKeys.put(row.key, indexEntry);
- break;
+ if (sstable.getCachedPosition(row.key, false) != null)
+ {
+ cachedKeys.put(row.key, indexEntry);
+ break;
+ }
}
}
- }
- if (newSSTableSegmentThresholdReached(writer))
- {
- // tmp = false because later we want to query it with descriptor from SSTableReader
- cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys);
- writer = createCompactionWriter(sstableDirectory, keysPerSSTable);
- writers.add(writer);
- cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
+ if (newSSTableSegmentThresholdReached(writer))
+ {
+ // tmp = false because later we want to query it with descriptor from SSTableReader
+ cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys);
+ returnWriteDirectory(dataDirectory, writeSize);
+ // make sure we don't try to call returnWriteDirectory in finally {..} if we throw exception in getWriteDirectory() below:
+ dataDirectory = null;
+ writeSize = getExpectedWriteSize() / estimatedSSTables;
+ dataDirectory = getWriteDirectory(writeSize);
+ writer = createCompactionWriter(cfs.directories.getLocationForDisk(dataDirectory), keysPerSSTable);
+ writers.add(writer);
+ cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
+ }
}
}
+ finally
+ {
+ if (dataDirectory != null)
+ returnWriteDirectory(dataDirectory, writeSize);
+ }
if (writer.getFilePointer() > 0)
{
@@ -291,6 +306,7 @@ public class CompactionTask extends AbstractCompactionTask
private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable)
{
+ assert sstableDirectory != null;
return new SSTableWriter(cfs.getTempSSTablePath(sstableDirectory),
keysPerSSTable,
cfs.metadata,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce1ad8e/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
index 198a88d..93b06ab 100644
--- a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
+++ b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
@@ -17,23 +17,16 @@
*/
package org.apache.cassandra.io.util;
-import java.io.File;
-
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.utils.WrappedRunnable;
public abstract class DiskAwareRunnable extends WrappedRunnable
{
- /**
- * Run this task after selecting the optimal disk for it
- */
- protected void runMayThrow() throws Exception
+ protected Directories.DataDirectory getWriteDirectory(long writeSize)
{
- long writeSize;
Directories.DataDirectory directory;
while (true)
{
- writeSize = getExpectedWriteSize();
directory = getDirectories().getWriteableLocation();
if (directory != null || !reduceScopeForLimitedSpace())
break;
@@ -43,15 +36,13 @@ public abstract class DiskAwareRunnable extends WrappedRunnable
directory.currentTasks.incrementAndGet();
directory.estimatedWorkingSize.addAndGet(writeSize);
- try
- {
- runWith(getDirectories().getLocationForDisk(directory));
- }
- finally
- {
- directory.estimatedWorkingSize.addAndGet(-1 * writeSize);
- directory.currentTasks.decrementAndGet();
- }
+ return directory;
+ }
+
+ protected void returnWriteDirectory(Directories.DataDirectory directory, long writeSize)
+ {
+ directory.estimatedWorkingSize.addAndGet(-1 * writeSize);
+ directory.currentTasks.decrementAndGet();
}
/**
@@ -61,18 +52,6 @@ public abstract class DiskAwareRunnable extends WrappedRunnable
protected abstract Directories getDirectories();
/**
- * Executes this task on given {@code sstableDirectory}.
- * @param sstableDirectory sstable directory to work on
- */
- protected abstract void runWith(File sstableDirectory) throws Exception;
-
- /**
- * Get expected write size to determine which disk to use for this task.
- * @return expected size in bytes this task will write to disk.
- */
- public abstract long getExpectedWriteSize();
-
- /**
* Called if no disk is available with free space for the full write size.
* @return true if the scope of the task was successfully reduced.
*/
[2/4] cassandra git commit: Merge branch 'cassandra-2.0' into
cassandra-2.1
Posted by ma...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1
Conflicts:
CHANGES.txt
src/java/org/apache/cassandra/db/Memtable.java
src/java/org/apache/cassandra/db/compaction/CompactionTask.java
src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0d01c365
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0d01c365
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0d01c365
Branch: refs/heads/trunk
Commit: 0d01c36599a7721a864780a3a10e134fdfa6797a
Parents: f02d194 2ce1ad8
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Nov 24 10:27:50 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Nov 24 10:27:50 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/Memtable.java | 6 +++--
.../cassandra/db/compaction/CompactionTask.java | 10 ++++-----
.../cassandra/io/util/DiskAwareRunnable.java | 23 ++------------------
4 files changed, 12 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d01c365/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 96da1bd,6a5ac0d..313000a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,18 -1,5 +1,19 @@@
-2.0.12:
+2.1.3
+ * Fix high size calculations for prepared statements (CASSANDRA-8231)
+ * Centralize shared executors (CASSANDRA-8055)
+ * Fix filtering for CONTAINS (KEY) relations on frozen collection
+ clustering columns when the query is restricted to a single
+ partition (CASSANDRA-8203)
+ * Do more aggressive entire-sstable TTL expiry checks (CASSANDRA-8243)
+ * Add more log info if readMeter is null (CASSANDRA-8238)
+ * add check of the system wall clock time at startup (CASSANDRA-8305)
+ * Support for frozen collections (CASSANDRA-7859)
+ * Fix overflow on histogram computation (CASSANDRA-8028)
+ * Have paxos reuse the timestamp generation of normal queries (CASSANDRA-7801)
+ * Fix incremental repair not remove parent session on remote (CASSANDRA-8291)
+ * Improve JBOD disk utilization (CASSANDRA-7386)
+Merged from 2.0:
+ * Make LCS split compaction results over all data directories (CASSANDRA-8329)
* Fix some failing queries that use multi-column relations
on COMPACT STORAGE tables (CASSANDRA-8264)
* Fix InvalidRequestException with ORDER BY (CASSANDRA-8286)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d01c365/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Memtable.java
index ba3864f,425b352..3ae5da4
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@@ -306,12 -336,23 +306,14 @@@ public class Memtabl
return estimatedSize;
}
- protected void runWith(File sstableDirectory) throws Exception
+ protected void runMayThrow() throws Exception
{
+ long writeSize = getExpectedWriteSize();
+ Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize);
+ File sstableDirectory = cfs.directories.getLocationForDisk(dataDirectory);
assert sstableDirectory != null : "Flush task is not bound to any disk";
-
- try
- {
- SSTableReader sstable = writeSortedContents(context, sstableDirectory);
- cfs.replaceFlushed(Memtable.this, sstable);
- latch.countDown();
- }
- finally
- {
- if (dataDirectory != null)
- returnWriteDirectory(dataDirectory, writeSize);
- }
+ SSTableReader sstable = writeSortedContents(context, sstableDirectory);
+ cfs.replaceFlushed(Memtable.this, sstable);
}
protected Directories getDirectories()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d01c365/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index b442482,08fe81a..0e8900d
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -106,11 -91,8 +106,11 @@@ public class CompactionTask extends Abs
{
// The collection of sstables passed may be empty (but not null); even if
// it is not empty, it may compact down to nothing if all rows are deleted.
- assert sstables != null && sstableDirectory != null;
+ assert sstables != null;
+ if (sstables.size() == 0)
+ return;
+
// Note that the current compaction strategy, is not necessarily the one this task was created under.
// This should be harmless; see comments to CFS.maybeReloadCompactionStrategy.
AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
@@@ -133,147 -112,206 +133,147 @@@
// new sstables from flush can be added during a compaction, but only the compaction can remove them,
// so in our single-threaded compaction world this is a valid way of determining if we're compacting
// all the sstables (that existed when we started)
- logger.info("Compacting {}", toCompact);
+ logger.info("Compacting {}", sstables);
long start = System.nanoTime();
- long totalkeysWritten = 0;
-
- long estimatedTotalKeys = Math.max(cfs.metadata.getIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact, cfs.metadata));
- long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
- long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
- if (logger.isDebugEnabled())
- logger.debug("Expected bloom filter size : " + keysPerSSTable);
-
- AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction()
- ? new ParallelCompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller)
- : new CompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller);
- CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
- Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
-
- // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
- // replace the old entries. Track entries to preheat here until then.
- Map<Descriptor, Map<DecoratedKey, RowIndexEntry>> cachedKeyMap = new HashMap<Descriptor, Map<DecoratedKey, RowIndexEntry>>();
-
- Collection<SSTableReader> sstables = new ArrayList<SSTableReader>();
- Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>();
-
- if (collector != null)
- collector.beginCompaction(ci);
- try
+ long totalKeysWritten = 0;
+
+ try (CompactionController controller = getCompactionController(sstables);)
{
- if (!iter.hasNext())
- {
- // don't mark compacted in the finally block, since if there _is_ nondeleted data,
- // we need to sync it (via closeAndOpen) first, so there is no period during which
- // a crash could cause data loss.
- cfs.markObsolete(toCompact, compactionType);
- return;
- }
- long writeSize = getExpectedWriteSize() / estimatedSSTables;
- Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize);
- SSTableWriter writer = createCompactionWriter(cfs.directories.getLocationForDisk(dataDirectory), keysPerSSTable);
- writers.add(writer);
- try
+ Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables());
+
+ long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact));
+ long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
+ long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
+ logger.debug("Expected bloom filter size : {}", keysPerSSTable);
+
+ try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
{
- while (iter.hasNext())
+ AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller);
+ Iterator<AbstractCompactedRow> iter = ci.iterator();
+ List<SSTableReader> newSStables;
+ // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
+ // replace the old entries. Track entries to preheat here until then.
+ long minRepairedAt = getMinRepairedAt(actuallyCompact);
+ // we only need the age of the data that we're actually retaining
+ long maxAge = getMaxDataAge(actuallyCompact);
+ if (collector != null)
+ collector.beginCompaction(ci);
+ long lastCheckObsoletion = start;
+ SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, offline);
+ try
{
- if (ci.isStopRequested())
- throw new CompactionInterruptedException(ci.getCompactionInfo());
-
- AbstractCompactedRow row = iter.next();
- RowIndexEntry indexEntry = writer.append(row);
- if (indexEntry == null)
+ if (!iter.hasNext())
{
- controller.invalidateCachedRow(row.key);
- row.close();
- continue;
+ // don't mark compacted in the finally block, since if there _is_ nondeleted data,
+ // we need to sync it (via closeAndOpen) first, so there is no period during which
+ // a crash could cause data loss.
+ cfs.markObsolete(sstables, compactionType);
+ return;
}
- writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
- totalkeysWritten++;
-
- if (DatabaseDescriptor.getPreheatKeyCache())
++ writer.switchWriter(createCompactionWriter(cfs.directories.getLocationForDisk(getWriteDirectory(estimatedTotalKeys/estimatedSSTables)), keysPerSSTable, minRepairedAt));
+ while (iter.hasNext())
{
- for (SSTableReader sstable : actuallyCompact)
+ if (ci.isStopRequested())
+ throw new CompactionInterruptedException(ci.getCompactionInfo());
+
+ AbstractCompactedRow row = iter.next();
+ if (writer.append(row) != null)
{
- if (sstable.getCachedPosition(row.key, false) != null)
+ totalKeysWritten++;
+ if (newSSTableSegmentThresholdReached(writer.currentWriter()))
{
- writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
- cachedKeys.put(row.key, indexEntry);
- break;
++ writer.switchWriter(createCompactionWriter(cfs.directories.getLocationForDisk(getWriteDirectory(estimatedTotalKeys/estimatedSSTables)), keysPerSSTable, minRepairedAt));
}
}
- }
- if (newSSTableSegmentThresholdReached(writer))
- {
- // tmp = false because later we want to query it with descriptor from SSTableReader
- cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys);
- returnWriteDirectory(dataDirectory, writeSize);
- // make sure we don't try to call returnWriteDirectory in finally {..} if we throw exception in getWriteDirectory() below:
- dataDirectory = null;
- writeSize = getExpectedWriteSize() / estimatedSSTables;
- dataDirectory = getWriteDirectory(writeSize);
- writer = createCompactionWriter(cfs.directories.getLocationForDisk(dataDirectory), keysPerSSTable);
- writers.add(writer);
- cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
+ if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L))
+ {
+ controller.maybeRefreshOverlaps();
+ lastCheckObsoletion = System.nanoTime();
+ }
}
- }
- }
- finally
- {
- if (dataDirectory != null)
- returnWriteDirectory(dataDirectory, writeSize);
- }
-
- if (writer.getFilePointer() > 0)
- {
- cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys);
- }
- else
- {
- writer.abort();
- writers.remove(writer);
- }
- long maxAge = getMaxDataAge(toCompact);
- for (SSTableWriter completedWriter : writers)
- sstables.add(completedWriter.closeAndOpenReader(maxAge));
- }
- catch (Throwable t)
- {
- for (SSTableWriter writer : writers)
- writer.abort();
- // also remove already completed SSTables
- for (SSTableReader sstable : sstables)
- {
- sstable.markObsolete();
- sstable.releaseReference();
- }
- throw Throwables.propagate(t);
- }
- finally
- {
- controller.close();
-
- // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
- // (in replaceCompactedSSTables)
- if (taskId != null)
- SystemKeyspace.finishCompaction(taskId);
-
- if (collector != null)
- collector.finishCompaction(ci);
-
- try
- {
- // We don't expect this to throw, but just in case, we do it after the cleanup above, to make sure
- // we don't end up with compaction information hanging around indefinitely in limbo.
- iter.close();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- replaceCompactedSSTables(toCompact, sstables);
- // TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up
- for (SSTableReader sstable : sstables)
- {
- if (sstable.acquireReference())
- {
- try
+ // don't replace old sstables yet, as we need to mark the compaction finished in the system table
+ newSStables = writer.finish();
+ }
+ catch (Throwable t)
{
- sstable.preheat(cachedKeyMap.get(sstable.descriptor));
+ writer.abort();
+ throw t;
}
finally
{
-
- sstable.releaseReference();
+ // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
+ // (in replaceCompactedSSTables)
+ if (taskId != null)
+ SystemKeyspace.finishCompaction(taskId);
+
+ if (collector != null)
+ collector.finishCompaction(ci);
}
- }
- }
- // log a bunch of statistics about the result and save to system table compaction_history
- long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
- long startsize = SSTable.getTotalBytes(toCompact);
- long endsize = SSTable.getTotalBytes(sstables);
- double ratio = (double) endsize / (double) startsize;
-
- StringBuilder builder = new StringBuilder();
- for (SSTableReader reader : sstables)
- builder.append(reader.descriptor.baseFilename()).append(",");
-
- double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
- long totalSourceRows = 0;
- long[] counts = ci.getMergedRowCounts();
- StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
- Map<Integer, Long> mergedRows = new HashMap<Integer, Long>();
- for (int i = 0; i < counts.length; i++)
- {
- long count = counts[i];
- if (count == 0)
- continue;
-
- int rows = i + 1;
- totalSourceRows += rows * count;
- mergeSummary.append(String.format("%d:%d, ", rows, count));
- mergedRows.put(rows, count);
+ Collection<SSTableReader> oldSStables = this.sstables;
+ if (!offline)
+ cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
+
+ // log a bunch of statistics about the result and save to system table compaction_history
+ long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ long startsize = SSTableReader.getTotalBytes(oldSStables);
+ long endsize = SSTableReader.getTotalBytes(newSStables);
+ double ratio = (double) endsize / (double) startsize;
+
+ StringBuilder newSSTableNames = new StringBuilder();
+ for (SSTableReader reader : newSStables)
+ newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
+
+ double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
+ long totalSourceRows = 0;
+ long[] counts = ci.getMergedRowCounts();
+ StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
+ Map<Integer, Long> mergedRows = new HashMap<>();
+ for (int i = 0; i < counts.length; i++)
+ {
+ long count = counts[i];
+ if (count == 0)
+ continue;
+
+ int rows = i + 1;
+ totalSourceRows += rows * count;
+ mergeSummary.append(String.format("%d:%d, ", rows, count));
+ mergedRows.put(rows, count);
+ }
+
+ SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
+ logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
+ oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
+ logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
+ logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
+ }
}
+ }
- SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
- logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
- toCompact.size(), builder.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalkeysWritten, mergeSummary.toString()));
- logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
+ private long getMinRepairedAt(Set<SSTableReader> actuallyCompact)
+ {
+ long minRepairedAt= Long.MAX_VALUE;
+ for (SSTableReader sstable : actuallyCompact)
+ minRepairedAt = Math.min(minRepairedAt, sstable.getSSTableMetadata().repairedAt);
+ if (minRepairedAt == Long.MAX_VALUE)
+ return ActiveRepairService.UNREPAIRED_SSTABLE;
+ return minRepairedAt;
}
- private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable)
+ private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable, long repairedAt)
{
+ assert sstableDirectory != null;
return new SSTableWriter(cfs.getTempSSTablePath(sstableDirectory),
keysPerSSTable,
+ repairedAt,
cfs.metadata,
cfs.partitioner,
- SSTableMetadata.createCollector(toCompact, cfs.metadata.comparator, getLevel()));
+ new MetadataCollector(sstables, cfs.metadata.comparator, getLevel()));
}
protected int getLevel()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d01c365/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
index 6d453e5,93b06ab..4188f6e
--- a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
+++ b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
@@@ -32,18 -27,24 +27,16 @@@ public abstract class DiskAwareRunnabl
Directories.DataDirectory directory;
while (true)
{
- writeSize = getExpectedWriteSize();
- directory = getDirectories().getWriteableLocation();
+ directory = getDirectories().getWriteableLocation(writeSize);
if (directory != null || !reduceScopeForLimitedSpace())
break;
}
if (directory == null)
throw new RuntimeException("Insufficient disk space to write " + writeSize + " bytes");
- runWith(getDirectories().getLocationForDisk(directory));
- directory.currentTasks.incrementAndGet();
- directory.estimatedWorkingSize.addAndGet(writeSize);
+ return directory;
}
- protected void returnWriteDirectory(Directories.DataDirectory directory, long writeSize)
- {
- directory.estimatedWorkingSize.addAndGet(-1 * writeSize);
- directory.currentTasks.decrementAndGet();
- }
-
/**
* Get sstable directories for the CF.
* @return Directories instance for the CF.
[4/4] cassandra git commit: Merge branch 'trunk' of
https://git-wip-us.apache.org/repos/asf/cassandra into trunk
Posted by ma...@apache.org.
Merge branch 'trunk' of https://git-wip-us.apache.org/repos/asf/cassandra into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/065aeeb4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/065aeeb4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/065aeeb4
Branch: refs/heads/trunk
Commit: 065aeeb4a9d2aca998f96f817649714badb2cc80
Parents: cd4f729 528cc3d
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Nov 24 10:39:05 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Nov 24 10:39:05 2014 +0100
----------------------------------------------------------------------
lib/jamm-0.3.0.jar | Bin 21149 -> 21033 bytes
1 file changed, 0 insertions(+), 0 deletions(-)
----------------------------------------------------------------------
[3/4] cassandra git commit: Merge branch 'cassandra-2.1' into trunk
Posted by ma...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Conflicts:
src/java/org/apache/cassandra/db/compaction/CompactionTask.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cd4f729e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cd4f729e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cd4f729e
Branch: refs/heads/trunk
Commit: cd4f729e9d2328564a77ac7bc52a392edd3d2b82
Parents: be0b451 0d01c36
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Nov 24 10:37:13 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Nov 24 10:37:13 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/Memtable.java | 6 +++--
.../cassandra/db/compaction/CompactionTask.java | 9 ++++----
.../cassandra/io/util/DiskAwareRunnable.java | 23 ++------------------
4 files changed, 11 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4f729e/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4f729e/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4f729e/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 808626b,0e8900d..1abb4ee
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -181,7 -173,7 +181,7 @@@ public class CompactionTask extends Abs
return;
}
- writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt, sstableFormat));
- writer.switchWriter(createCompactionWriter(cfs.directories.getLocationForDisk(getWriteDirectory(estimatedTotalKeys/estimatedSSTables)), keysPerSSTable, minRepairedAt));
++ writer.switchWriter(createCompactionWriter(cfs.directories.getLocationForDisk(getWriteDirectory(estimatedTotalKeys/estimatedSSTables)), keysPerSSTable, minRepairedAt, sstableFormat));
while (iter.hasNext())
{
if (ci.isStopRequested())
@@@ -193,7 -185,7 +193,7 @@@
totalKeysWritten++;
if (newSSTableSegmentThresholdReached(writer.currentWriter()))
{
- writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt, sstableFormat));
- writer.switchWriter(createCompactionWriter(cfs.directories.getLocationForDisk(getWriteDirectory(estimatedTotalKeys/estimatedSSTables)), keysPerSSTable, minRepairedAt));
++ writer.switchWriter(createCompactionWriter(cfs.directories.getLocationForDisk(getWriteDirectory(estimatedTotalKeys/estimatedSSTables)), keysPerSSTable, minRepairedAt, sstableFormat));
}
}