You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/05/07 09:09:00 UTC
[cassandra] branch cassandra-3.0 updated: liveDiskSpaceUsed and
totalDiskSpaceUsed get corrupted if IndexSummaryRedistribution gets
interrupted
This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
new b105e91 liveDiskSpaceUsed and totalDiskSpaceUsed get corrupted if IndexSummaryRedistribution gets interrupted
b105e91 is described below
commit b105e919678240b5f448df9acaf6c93117f0c0cc
Author: David Capwell <dc...@gmail.com>
AuthorDate: Fri Mar 27 18:14:02 2020 -0700
liveDiskSpaceUsed and totalDiskSpaceUsed get corrupted if IndexSummaryRedistribution gets interrupted
Patch by David Capwell; reviewed by marcuse for CASSANDRA-15674
---
CHANGES.txt | 1 +
.../db/lifecycle/LifecycleTransaction.java | 44 +++++++
.../io/sstable/IndexSummaryRedistribution.java | 27 +++-
.../cassandra/io/sstable/format/SSTableReader.java | 6 -
.../apache/cassandra/io/DiskSpaceMetricsTest.java | 139 +++++++++++++++++++++
5 files changed, 210 insertions(+), 7 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 6e6f418..0a0a4d5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.21
+ * liveDiskSpaceUsed and totalDiskSpaceUsed get corrupted if IndexSummaryRedistribution gets interrupted (CASSANDRA-15674)
* Fix Debian init start/stop (CASSANDRA-15770)
* Fix infinite loop on index query paging in tables with clustering (CASSANDRA-14242)
* Fix chunk index overflow due to large sstable with small chunk length (CASSANDRA-15595)
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
index 7ecaa38..4abce33 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableReader.UniqueIdentifier;
+import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.concurrent.Transactional;
import static com.google.common.base.Functions.compose;
@@ -123,6 +124,10 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional im
// the tidier and their readers, to be used for marking readers obsoleted during a commit
private List<LogTransaction.Obsoletion> obsoletions;
+ // commit/rollback hooks
+ private List<Runnable> commitHooks = new ArrayList<>();
+ private List<Runnable> abortHooks = new ArrayList<>();
+
/**
* construct a Transaction for use in an offline operation
*/
@@ -223,12 +228,14 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional im
accumulate = markObsolete(obsoletions, accumulate);
accumulate = tracker.updateSizeTracking(logged.obsolete, logged.update, accumulate);
+ accumulate = runOnCommitHooks(accumulate);
accumulate = release(selfRefs(logged.obsolete), accumulate);
accumulate = tracker.notifySSTablesChanged(originals, logged.update, log.type(), accumulate);
return accumulate;
}
+
/**
* undo all of the changes made by this transaction, resetting the state to its original form
*/
@@ -259,6 +266,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional im
accumulate = tracker.notifySSTablesChanged(invalid, restored, OperationType.COMPACTION, accumulate);
// setReplaced immediately preceding versions that have not been obsoleted
accumulate = setReplaced(logged.update, accumulate);
+ accumulate = runOnAbortooks(accumulate);
// we have replaced all of logged.update and never made visible staged.update,
// and the files we have logged as obsolete we clone fresh versions of, so they are no longer needed either
// any _staged_ obsoletes should either be in staged.update already, and dealt with there,
@@ -270,6 +278,32 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional im
return accumulate;
}
+ private Throwable runOnCommitHooks(Throwable accumulate)
+ {
+ return runHooks(commitHooks, accumulate);
+ }
+
+ private Throwable runOnAbortooks(Throwable accumulate)
+ {
+ return runHooks(abortHooks, accumulate);
+ }
+
+ private static Throwable runHooks(Iterable<Runnable> hooks, Throwable accumulate)
+ {
+ for (Runnable hook : hooks)
+ {
+ try
+ {
+ hook.run();
+ }
+ catch (Exception e)
+ {
+ accumulate = Throwables.merge(accumulate, e);
+ }
+ }
+ return accumulate;
+ }
+
@Override
protected Throwable doPostCleanup(Throwable accumulate)
{
@@ -366,6 +400,16 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional im
staged.obsolete.add(reader);
}
+ public void runOnCommit(Runnable fn)
+ {
+ commitHooks.add(fn);
+ }
+
+ public void runOnAbort(Runnable fn)
+ {
+ abortHooks.add(fn);
+ }
+
/**
* obsolete every file in the original transaction
*/
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
index 189ee2d..45bd7eb 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
@@ -37,11 +37,11 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.compaction.CompactionInfo;
import org.apache.cassandra.db.compaction.CompactionInterruptedException;
-import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.compaction.CompactionInfo.Unit;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Refs;
@@ -261,14 +261,39 @@ public class IndexSummaryRedistribution extends CompactionInfo.Holder
sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL,
entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL);
ColumnFamilyStore cfs = Keyspace.open(sstable.metadata.ksName).getColumnFamilyStore(sstable.metadata.cfId);
+ long oldSize = sstable.bytesOnDisk();
SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel);
+ long newSize = replacement.bytesOnDisk();
newSSTables.add(replacement);
transactions.get(sstable.metadata.cfId).update(replacement, true);
+ addHooks(cfs, transactions, oldSize, newSize);
}
return newSSTables;
}
+ /**
+ * Add hooks to correctly update the storage load metrics once the transaction is closed/aborted
+ */
+ @SuppressWarnings("resource") // Transactions are closed in finally outside of this method
+ private void addHooks(ColumnFamilyStore cfs, Map<UUID, LifecycleTransaction> transactions, long oldSize, long newSize)
+ {
+ LifecycleTransaction txn = transactions.get(cfs.metadata.cfId);
+ txn.runOnCommit(() -> {
+ // The new size will be added in Transactional.commit() as an updated SSTable, more details: CASSANDRA-13738
+ StorageMetrics.load.dec(oldSize);
+ cfs.metric.liveDiskSpaceUsed.dec(oldSize);
+ cfs.metric.totalDiskSpaceUsed.dec(oldSize);
+ });
+ txn.runOnAbort(() -> {
+ // the local disk was modified but book keeping couldn't be commited, apply the delta
+ long delta = oldSize - newSize; // if new is larger this will be negative, so dec will become a inc
+ StorageMetrics.load.dec(delta);
+ cfs.metric.liveDiskSpaceUsed.dec(delta);
+ cfs.metric.totalDiskSpaceUsed.dec(delta);
+ });
+ }
+
@VisibleForTesting
static Pair<List<SSTableReader>, List<ResampleEntry>> distributeRemainingSpace(List<ResampleEntry> toDownsample, long remainingSpace)
{
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index c094e0b..0485275 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -1133,7 +1133,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
double effectiveInterval = indexSummary.getEffectiveIndexInterval();
IndexSummary newSummary;
- long oldSize = bytesOnDisk();
// We have to rebuild the summary from the on-disk primary index in three cases:
// 1. The sampling level went up, so we need to read more entries off disk
@@ -1162,11 +1161,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
saveSummary(ibuilder, dbuilder, newSummary);
}
- // The new size will be added in Transactional.commit() as an updated SSTable, more details: CASSANDRA-13738
- StorageMetrics.load.dec(oldSize);
- parent.metric.liveDiskSpaceUsed.dec(oldSize);
- parent.metric.totalDiskSpaceUsed.dec(oldSize);
-
return cloneAndReplace(first, OpenReason.METADATA_CHANGE, newSummary);
}
}
diff --git a/test/unit/org/apache/cassandra/io/DiskSpaceMetricsTest.java b/test/unit/org/apache/cassandra/io/DiskSpaceMetricsTest.java
new file mode 100644
index 0000000..ddacc6b
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/DiskSpaceMetricsTest.java
@@ -0,0 +1,139 @@
+package org.apache.cassandra.io;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.io.sstable.IndexSummaryManager;
+import org.apache.cassandra.io.sstable.IndexSummaryRedistribution;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class DiskSpaceMetricsTest extends CQLTester
+{
+ /**
+ * This test runs the system with normal operations and makes sure the disk metrics match reality
+ */
+ @Test
+ public void baseline() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk bigint, PRIMARY KEY (pk)) WITH min_index_interval=1");
+ ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+
+ // disable compaction so nothing changes between calculations
+ cfs.disableAutoCompaction();
+
+ // create 100 sstables
+ for (int i = 0; i < 100; i++)
+ insert(cfs, i);
+ assertDiskSpaceEqual(cfs);
+ }
+
+ /**
+ * If index summary downsampling is interrupted in the middle, the metrics still reflect the real data
+ */
+ @Test
+ public void summaryRedistribution() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk bigint, PRIMARY KEY (pk)) WITH min_index_interval=1");
+ ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+
+ // disable compaction so nothing changes between calculations
+ cfs.disableAutoCompaction();
+
+ // create 100 sstables, make sure they have more than 1 value, else sampling can't happen
+ for (int i = 0; i < 100; i++)
+ insertN(cfs, 10, i);
+ assertDiskSpaceEqual(cfs);
+
+ // summary downsample
+ for (int i = 0; i < 100; i++)
+ {
+ indexDownsampleCancelLastSSTable(cfs);
+ assertDiskSpaceEqual(cfs);
+ }
+ }
+
+ private void insert(ColumnFamilyStore cfs, long value) throws Throwable
+ {
+ insertN(cfs, 1, value);
+ }
+
+ private void insertN(ColumnFamilyStore cfs, int n, long base) throws Throwable
+ {
+ for (int i = 0; i < n; i++)
+ execute("INSERT INTO %s (pk) VALUES (?)", base + i);
+
+ // flush to write the sstable
+ cfs.forceBlockingFlush();
+ }
+
+ private void assertDiskSpaceEqual(ColumnFamilyStore cfs)
+ {
+ long liveDiskSpaceUsed = cfs.metric.liveDiskSpaceUsed.getCount();
+ long actual = 0;
+ for (SSTableReader sstable : cfs.getTracker().getView().liveSSTables())
+ actual += sstable.bytesOnDisk();
+
+ Assert.assertEquals("bytes on disk does not match current metric liveDiskSpaceUsed", actual, liveDiskSpaceUsed);
+
+ // totalDiskSpaceUsed is based off SStable delete, which is async: LogTransaction's tidy enqueues in ScheduledExecutors.nonPeriodicTasks
+ // wait for there to be no more pending sstable releases
+ LifecycleTransaction.waitForDeletions();
+ long totalDiskSpaceUsed = cfs.metric.totalDiskSpaceUsed.getCount();
+ Assert.assertEquals("bytes on disk does not match current metric totalDiskSpaceUsed", actual, totalDiskSpaceUsed);
+ }
+
+ private static void indexDownsampleCancelLastSSTable(ColumnFamilyStore cfs)
+ {
+ List<SSTableReader> sstables = Lists.newArrayList(cfs.getSSTables(SSTableSet.CANONICAL));
+ LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
+ Map<UUID, LifecycleTransaction> txns = ImmutableMap.of(cfs.metadata.cfId, txn);
+ // fail on the last file (* 3 because we call isStopRequested 3 times for each sstable, and we should fail on the last)
+ AtomicInteger countdown = new AtomicInteger(3 * sstables.size() - 1);
+ IndexSummaryRedistribution redistribution = new IndexSummaryRedistribution(Collections.emptyList(), txns, 0) {
+ public boolean isStopRequested()
+ {
+ return countdown.decrementAndGet() == 0;
+ }
+ };
+ try
+ {
+ IndexSummaryManager.redistributeSummaries(redistribution);
+ Assert.fail("Should throw CompactionInterruptedException");
+ }
+ catch (CompactionInterruptedException e)
+ {
+ // trying to get this to happen
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ try
+ {
+ FBUtilities.closeAll(txns.values());
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org