You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/05/07 09:09:00 UTC

[cassandra] branch cassandra-3.0 updated: liveDiskSpaceUsed and totalDiskSpaceUsed get corrupted if IndexSummaryRedistribution gets interrupted

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new b105e91  liveDiskSpaceUsed and totalDiskSpaceUsed get corrupted if IndexSummaryRedistribution gets interrupted
b105e91 is described below

commit b105e919678240b5f448df9acaf6c93117f0c0cc
Author: David Capwell <dc...@gmail.com>
AuthorDate: Fri Mar 27 18:14:02 2020 -0700

    liveDiskSpaceUsed and totalDiskSpaceUsed get corrupted if IndexSummaryRedistribution gets interrupted
    
    Patch by David Capwell; reviewed by marcuse for CASSANDRA-15674
---
 CHANGES.txt                                        |   1 +
 .../db/lifecycle/LifecycleTransaction.java         |  44 +++++++
 .../io/sstable/IndexSummaryRedistribution.java     |  27 +++-
 .../cassandra/io/sstable/format/SSTableReader.java |   6 -
 .../apache/cassandra/io/DiskSpaceMetricsTest.java  | 139 +++++++++++++++++++++
 5 files changed, 210 insertions(+), 7 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 6e6f418..0a0a4d5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.21
+ * liveDiskSpaceUsed and totalDiskSpaceUsed get corrupted if IndexSummaryRedistribution gets interrupted (CASSANDRA-15674)
  * Fix Debian init start/stop (CASSANDRA-15770)
  * Fix infinite loop on index query paging in tables with clustering (CASSANDRA-14242)
  * Fix chunk index overflow due to large sstable with small chunk length (CASSANDRA-15595)
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
index 7ecaa38..4abce33 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableReader.UniqueIdentifier;
+import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.concurrent.Transactional;
 
 import static com.google.common.base.Functions.compose;
@@ -123,6 +124,10 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional im
     // the tidier and their readers, to be used for marking readers obsoleted during a commit
     private List<LogTransaction.Obsoletion> obsoletions;
 
+    // commit/rollback hooks
+    private List<Runnable> commitHooks = new ArrayList<>();
+    private List<Runnable> abortHooks = new ArrayList<>();
+
     /**
      * construct a Transaction for use in an offline operation
      */
@@ -223,12 +228,14 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional im
 
         accumulate = markObsolete(obsoletions, accumulate);
         accumulate = tracker.updateSizeTracking(logged.obsolete, logged.update, accumulate);
+        accumulate = runOnCommitHooks(accumulate);
         accumulate = release(selfRefs(logged.obsolete), accumulate);
         accumulate = tracker.notifySSTablesChanged(originals, logged.update, log.type(), accumulate);
 
         return accumulate;
     }
 
+
     /**
      * undo all of the changes made by this transaction, resetting the state to its original form
      */
@@ -259,6 +266,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional im
         accumulate = tracker.notifySSTablesChanged(invalid, restored, OperationType.COMPACTION, accumulate);
         // setReplaced immediately preceding versions that have not been obsoleted
         accumulate = setReplaced(logged.update, accumulate);
+        accumulate = runOnAbortooks(accumulate);
         // we have replaced all of logged.update and never made visible staged.update,
         // and the files we have logged as obsolete we clone fresh versions of, so they are no longer needed either
         // any _staged_ obsoletes should either be in staged.update already, and dealt with there,
@@ -270,6 +278,32 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional im
         return accumulate;
     }
 
+    private Throwable runOnCommitHooks(Throwable accumulate)
+    {
+        return runHooks(commitHooks, accumulate);
+    }
+
+    private Throwable runOnAbortooks(Throwable accumulate)
+    {
+        return runHooks(abortHooks, accumulate);
+    }
+
+    private static Throwable runHooks(Iterable<Runnable> hooks, Throwable accumulate)
+    {
+        for (Runnable hook : hooks)
+        {
+            try
+            {
+                hook.run();
+            }
+            catch (Exception e)
+            {
+                accumulate = Throwables.merge(accumulate, e);
+            }
+        }
+        return accumulate;
+    }
+
     @Override
     protected Throwable doPostCleanup(Throwable accumulate)
     {
@@ -366,6 +400,16 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional im
         staged.obsolete.add(reader);
     }
 
+    public void runOnCommit(Runnable fn)
+    {
+        commitHooks.add(fn);
+    }
+
+    public void runOnAbort(Runnable fn)
+    {
+        abortHooks.add(fn);
+    }
+
     /**
      * obsolete every file in the original transaction
      */
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
index 189ee2d..45bd7eb 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
@@ -37,11 +37,11 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.compaction.CompactionInfo;
 import org.apache.cassandra.db.compaction.CompactionInterruptedException;
-import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.compaction.CompactionInfo.Unit;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.Refs;
 
@@ -261,14 +261,39 @@ public class IndexSummaryRedistribution extends CompactionInfo.Holder
                          sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL,
                          entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL);
             ColumnFamilyStore cfs = Keyspace.open(sstable.metadata.ksName).getColumnFamilyStore(sstable.metadata.cfId);
+            long oldSize = sstable.bytesOnDisk();
             SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel);
+            long newSize = replacement.bytesOnDisk();
             newSSTables.add(replacement);
             transactions.get(sstable.metadata.cfId).update(replacement, true);
+            addHooks(cfs, transactions, oldSize, newSize);
         }
 
         return newSSTables;
     }
 
+    /**
+     * Add hooks to correctly update the storage load metrics once the transaction is closed/aborted
+     */
+    @SuppressWarnings("resource") // Transactions are closed in finally outside of this method
+    private void addHooks(ColumnFamilyStore cfs, Map<UUID, LifecycleTransaction> transactions, long oldSize, long newSize)
+    {
+        LifecycleTransaction txn = transactions.get(cfs.metadata.cfId);
+        txn.runOnCommit(() -> {
+            // The new size will be added in Transactional.commit() as an updated SSTable, more details: CASSANDRA-13738
+            StorageMetrics.load.dec(oldSize);
+            cfs.metric.liveDiskSpaceUsed.dec(oldSize);
+            cfs.metric.totalDiskSpaceUsed.dec(oldSize);
+        });
+        txn.runOnAbort(() -> {
+            // the local disk was modified but book keeping couldn't be commited, apply the delta
+            long delta = oldSize - newSize; // if new is larger this will be negative, so dec will become a inc
+            StorageMetrics.load.dec(delta);
+            cfs.metric.liveDiskSpaceUsed.dec(delta);
+            cfs.metric.totalDiskSpaceUsed.dec(delta);
+        });
+    }
+
     @VisibleForTesting
     static Pair<List<SSTableReader>, List<ResampleEntry>> distributeRemainingSpace(List<ResampleEntry> toDownsample, long remainingSpace)
     {
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index c094e0b..0485275 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -1133,7 +1133,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
             double effectiveInterval = indexSummary.getEffectiveIndexInterval();
 
             IndexSummary newSummary;
-            long oldSize = bytesOnDisk();
 
             // We have to rebuild the summary from the on-disk primary index in three cases:
             // 1. The sampling level went up, so we need to read more entries off disk
@@ -1162,11 +1161,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                 saveSummary(ibuilder, dbuilder, newSummary);
             }
 
-            // The new size will be added in Transactional.commit() as an updated SSTable, more details: CASSANDRA-13738
-            StorageMetrics.load.dec(oldSize);
-            parent.metric.liveDiskSpaceUsed.dec(oldSize);
-            parent.metric.totalDiskSpaceUsed.dec(oldSize);
-
             return cloneAndReplace(first, OpenReason.METADATA_CHANGE, newSummary);
         }
     }
diff --git a/test/unit/org/apache/cassandra/io/DiskSpaceMetricsTest.java b/test/unit/org/apache/cassandra/io/DiskSpaceMetricsTest.java
new file mode 100644
index 0000000..ddacc6b
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/DiskSpaceMetricsTest.java
@@ -0,0 +1,139 @@
+package org.apache.cassandra.io;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.io.sstable.IndexSummaryManager;
+import org.apache.cassandra.io.sstable.IndexSummaryRedistribution;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class DiskSpaceMetricsTest extends CQLTester
+{
+    /**
+     * This test runs the system with normal operations and makes sure the disk metrics match reality
+     */
+    @Test
+    public void baseline() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk bigint, PRIMARY KEY (pk)) WITH min_index_interval=1");
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+
+        // disable compaction so nothing changes between calculations
+        cfs.disableAutoCompaction();
+
+        // create 100 sstables
+        for (int i = 0; i < 100; i++)
+            insert(cfs, i);
+        assertDiskSpaceEqual(cfs);
+    }
+
+    /**
+     * If index summary downsampling is interrupted in the middle, the metrics still reflect the real data
+     */
+    @Test
+    public void summaryRedistribution() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk bigint, PRIMARY KEY (pk)) WITH min_index_interval=1");
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+
+        // disable compaction so nothing changes between calculations
+        cfs.disableAutoCompaction();
+
+        // create 100 sstables, make sure they have more than 1 value, else sampling can't happen
+        for (int i = 0; i < 100; i++)
+            insertN(cfs, 10, i);
+        assertDiskSpaceEqual(cfs);
+
+        // summary downsample
+        for (int i = 0; i < 100; i++)
+        {
+            indexDownsampleCancelLastSSTable(cfs);
+            assertDiskSpaceEqual(cfs);
+        }
+    }
+
+    private void insert(ColumnFamilyStore cfs, long value) throws Throwable
+    {
+        insertN(cfs, 1, value);
+    }
+
+    private void insertN(ColumnFamilyStore cfs, int n, long base) throws Throwable
+    {
+        for (int i = 0; i < n; i++)
+            execute("INSERT INTO %s (pk) VALUES (?)", base + i);
+
+        // flush to write the sstable
+        cfs.forceBlockingFlush();
+    }
+
+    private void assertDiskSpaceEqual(ColumnFamilyStore cfs)
+    {
+        long liveDiskSpaceUsed = cfs.metric.liveDiskSpaceUsed.getCount();
+        long actual = 0;
+        for (SSTableReader sstable : cfs.getTracker().getView().liveSSTables())
+            actual += sstable.bytesOnDisk();
+
+        Assert.assertEquals("bytes on disk does not match current metric liveDiskSpaceUsed", actual, liveDiskSpaceUsed);
+
+        // totalDiskSpaceUsed is based off SStable delete, which is async: LogTransaction's tidy enqueues in ScheduledExecutors.nonPeriodicTasks
+        // wait for there to be no more pending sstable releases
+        LifecycleTransaction.waitForDeletions();
+        long totalDiskSpaceUsed = cfs.metric.totalDiskSpaceUsed.getCount();
+        Assert.assertEquals("bytes on disk does not match current metric totalDiskSpaceUsed", actual, totalDiskSpaceUsed);
+    }
+
+    private static void indexDownsampleCancelLastSSTable(ColumnFamilyStore cfs)
+    {
+        List<SSTableReader> sstables = Lists.newArrayList(cfs.getSSTables(SSTableSet.CANONICAL));
+        LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
+        Map<UUID, LifecycleTransaction> txns = ImmutableMap.of(cfs.metadata.cfId, txn);
+        // fail on the last file (* 3 because we call isStopRequested 3 times for each sstable, and we should fail on the last)
+        AtomicInteger countdown = new AtomicInteger(3 * sstables.size() - 1);
+        IndexSummaryRedistribution redistribution = new IndexSummaryRedistribution(Collections.emptyList(), txns, 0) {
+            public boolean isStopRequested()
+            {
+                return countdown.decrementAndGet() == 0;
+            }
+        };
+        try
+        {
+            IndexSummaryManager.redistributeSummaries(redistribution);
+            Assert.fail("Should throw CompactionInterruptedException");
+        }
+        catch (CompactionInterruptedException e)
+        {
+            // trying to get this to happen
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+        finally
+        {
+            try
+            {
+                FBUtilities.closeAll(txns.values());
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org