You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/05/07 09:09:03 UTC

[cassandra] branch trunk updated (da95e4b -> d95aebe)

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from da95e4b  Add tunable initial size and growth factor to RangeTombstoneList
     new b105e91  liveDiskSpaceUsed and totalDiskSpaceUsed get corrupted if IndexSummaryRedistribution gets interrupted
     new 18736b4  Merge branch 'cassandra-3.0' into cassandra-3.11
     new d95aebe  Merge branch 'cassandra-3.11' into trunk

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   1 +
 .../db/lifecycle/LifecycleTransaction.java         |  45 ++++++-
 .../io/sstable/IndexSummaryRedistribution.java     |  28 ++++-
 .../cassandra/io/sstable/format/SSTableReader.java |   6 -
 .../apache/cassandra/io/DiskSpaceMetricsTest.java  | 138 +++++++++++++++++++++
 5 files changed, 209 insertions(+), 9 deletions(-)
 create mode 100644 test/unit/org/apache/cassandra/io/DiskSpaceMetricsTest.java


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.11' into trunk

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit d95aebe0a3583179605b63dba0a4edc33cb2f97f
Merge: da95e4b 18736b4
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Thu May 7 11:00:46 2020 +0200

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |   1 +
 .../db/lifecycle/LifecycleTransaction.java         |  45 ++++++-
 .../io/sstable/IndexSummaryRedistribution.java     |  28 ++++-
 .../cassandra/io/sstable/format/SSTableReader.java |   6 -
 .../apache/cassandra/io/DiskSpaceMetricsTest.java  | 138 +++++++++++++++++++++
 5 files changed, 209 insertions(+), 9 deletions(-)

diff --cc CHANGES.txt
index 0c50b0a,c326801..7f1930d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,27 -1,7 +1,28 @@@
 -3.11.7
 - * Allow sstableloader to use SSL on the native port (CASSANDRA-14904)
 +4.0-alpha5
 + * Add tunable initial size and growth factor to RangeTombstoneList (CASSANDRA-15763)
 + * Improve debug logging in SSTableReader for index summary (CASSANDRA-15755)
 + * bin/sstableverify should support user provided token ranges (CASSANDRA-15753)
 + * Improve logging when mutation passed to commit log is too large (CASSANDRA-14781)
 + * replace LZ4FastDecompressor with LZ4SafeDecompressor (CASSANDRA-15560)
 + * Fix buffer pool NPE with concurrent release due to in-progress tiny pool eviction (CASSANDRA-15726)
 + * Avoid race condition when completing stream sessions (CASSANDRA-15666)
 + * Flush with fast compressors by default (CASSANDRA-15379)
 + * Fix CqlInputFormat regression from the switch to system.size_estimates (CASSANDRA-15637)
 + * Allow sending Entire SSTables over SSL (CASSANDRA-15740)
 + * Fix CQLSH UTF-8 encoding issue for Python 2/3 compatibility (CASSANDRA-15739)
 + * Fix batch statement preparation when multiple tables and parameters are used (CASSANDRA-15730)
 + * Fix regression with traceOutgoingMessage printing message size (CASSANDRA-15687)
 + * Ensure repaired data tracking reads a consistent amount of data across replicas (CASSANDRA-15601)
 + * Fix CQLSH to avoid arguments being evaluated (CASSANDRA-15660)
 + * Correct Visibility and Improve Safety of Methods in LatencyMetrics (CASSANDRA-15597)
 + * Allow cqlsh to run with Python2.7/Python3.6+ (CASSANDRA-15659,CASSANDRA-15573)
 + * Improve logging around incremental repair (CASSANDRA-15599)
 + * Do not check cdc_raw_directory filesystem space if CDC disabled (CASSANDRA-15688)
 + * Replace array iterators with get by index (CASSANDRA-15394)
 + * Minimize BTree iterator allocations (CASSANDRA-15389)
 +Merged from 3.11:
  Merged from 3.0:
+  * liveDiskSpaceUsed and totalDiskSpaceUsed get corrupted if IndexSummaryRedistribution gets interrupted (CASSANDRA-15674)
   * Fix Debian init start/stop (CASSANDRA-15770)
   * Fix infinite loop on index query paging in tables with clustering (CASSANDRA-14242)
   * Fix chunk index overflow due to large sstable with small chunk length (CASSANDRA-15595)
diff --cc src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
index a129e41,5994707..574c6a4
--- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
@@@ -20,8 -20,8 +20,7 @@@ package org.apache.cassandra.db.lifecyc
  import java.io.File;
  import java.nio.file.Path;
  import java.util.*;
--import java.util.function.BiFunction;
 -
 +import java.util.function.BiPredicate;
  import com.google.common.annotations.VisibleForTesting;
  import com.google.common.base.Predicate;
  import com.google.common.collect.*;
diff --cc src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
index 1300c99,b914963..90a8621
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
@@@ -28,7 -28,8 +28,6 @@@ import java.util.Map
  import java.util.UUID;
  
  import com.google.common.annotations.VisibleForTesting;
 -import com.google.common.collect.ImmutableList;
--import com.google.common.collect.Iterables;
  
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -41,7 -41,7 +39,8 @@@ import org.apache.cassandra.db.compacti
  import org.apache.cassandra.db.compaction.CompactionInfo.Unit;
  import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.metrics.StorageMetrics;
 +import org.apache.cassandra.schema.TableId;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.Pair;
  import org.apache.cassandra.utils.concurrent.Refs;
@@@ -256,15 -262,40 +255,40 @@@ public class IndexSummaryRedistributio
              logger.trace("Re-sampling index summary for {} from {}/{} to {}/{} of the original number of entries",
                           sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL,
                           entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL);
 -            ColumnFamilyStore cfs = Keyspace.open(sstable.metadata.ksName).getColumnFamilyStore(sstable.metadata.cfId);
 +            ColumnFamilyStore cfs = Keyspace.open(sstable.metadata().keyspace).getColumnFamilyStore(sstable.metadata().id);
+             long oldSize = sstable.bytesOnDisk();
              SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel);
+             long newSize = replacement.bytesOnDisk();
              newSSTables.add(replacement);
 -            transactions.get(sstable.metadata.cfId).update(replacement, true);
 +            transactions.get(sstable.metadata().id).update(replacement, true);
+             addHooks(cfs, transactions, oldSize, newSize);
          }
  
          return newSSTables;
      }
  
+     /**
+      * Add hooks to correctly update the storage load metrics once the transaction is closed/aborted
+      */
+     @SuppressWarnings("resource") // Transactions are closed in finally outside of this method
 -    private void addHooks(ColumnFamilyStore cfs, Map<UUID, LifecycleTransaction> transactions, long oldSize, long newSize)
++    private void addHooks(ColumnFamilyStore cfs, Map<TableId, LifecycleTransaction> transactions, long oldSize, long newSize)
+     {
 -        LifecycleTransaction txn = transactions.get(cfs.metadata.cfId);
++        LifecycleTransaction txn = transactions.get(cfs.metadata.id);
+         txn.runOnCommit(() -> {
+             // The new size will be added in Transactional.commit() as an updated SSTable, more details: CASSANDRA-13738
+             StorageMetrics.load.dec(oldSize);
+             cfs.metric.liveDiskSpaceUsed.dec(oldSize);
+             cfs.metric.totalDiskSpaceUsed.dec(oldSize);
+         });
+         txn.runOnAbort(() -> {
+             // the local disk was modified but book keeping couldn't be commited, apply the delta
+             long delta = oldSize - newSize; // if new is larger this will be negative, so dec will become a inc
+             StorageMetrics.load.dec(delta);
+             cfs.metric.liveDiskSpaceUsed.dec(delta);
+             cfs.metric.totalDiskSpaceUsed.dec(delta);
+         });
+     }
+ 
      @VisibleForTesting
      static Pair<List<SSTableReader>, List<ResampleEntry>> distributeRemainingSpace(List<ResampleEntry> toDownsample, long remainingSpace)
      {
diff --cc test/unit/org/apache/cassandra/io/DiskSpaceMetricsTest.java
index 0000000,ddacc6b..73d5e22
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/io/DiskSpaceMetricsTest.java
+++ b/test/unit/org/apache/cassandra/io/DiskSpaceMetricsTest.java
@@@ -1,0 -1,139 +1,138 @@@
+ package org.apache.cassandra.io;
+ 
+ import java.io.IOException;
 -import java.util.Collections;
+ import java.util.List;
+ import java.util.Map;
 -import java.util.UUID;
+ import java.util.concurrent.atomic.AtomicInteger;
+ 
+ import com.google.common.collect.ImmutableMap;
+ import com.google.common.collect.Lists;
+ import org.junit.Assert;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.cql3.CQLTester;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+ import org.apache.cassandra.db.compaction.OperationType;
+ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+ import org.apache.cassandra.db.lifecycle.SSTableSet;
+ import org.apache.cassandra.io.sstable.IndexSummaryManager;
+ import org.apache.cassandra.io.sstable.IndexSummaryRedistribution;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
++import org.apache.cassandra.schema.TableId;
+ import org.apache.cassandra.utils.FBUtilities;
+ 
+ public class DiskSpaceMetricsTest extends CQLTester
+ {
+     /**
+      * This test runs the system with normal operations and makes sure the disk metrics match reality
+      */
+     @Test
+     public void baseline() throws Throwable
+     {
+         createTable("CREATE TABLE %s (pk bigint, PRIMARY KEY (pk)) WITH min_index_interval=1");
+         ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+ 
+         // disable compaction so nothing changes between calculations
+         cfs.disableAutoCompaction();
+ 
+         // create 100 sstables
+         for (int i = 0; i < 100; i++)
+             insert(cfs, i);
+         assertDiskSpaceEqual(cfs);
+     }
+ 
+     /**
+      * If index summary downsampling is interrupted in the middle, the metrics still reflect the real data
+      */
+     @Test
+     public void summaryRedistribution() throws Throwable
+     {
+         createTable("CREATE TABLE %s (pk bigint, PRIMARY KEY (pk)) WITH min_index_interval=1");
+         ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+ 
+         // disable compaction so nothing changes between calculations
+         cfs.disableAutoCompaction();
+ 
+         // create 100 sstables, make sure they have more than 1 value, else sampling can't happen
+         for (int i = 0; i < 100; i++)
+             insertN(cfs, 10, i);
+         assertDiskSpaceEqual(cfs);
+ 
+         // summary downsample
+         for (int i = 0; i < 100; i++)
+         {
+             indexDownsampleCancelLastSSTable(cfs);
+             assertDiskSpaceEqual(cfs);
+         }
+     }
+ 
+     private void insert(ColumnFamilyStore cfs, long value) throws Throwable
+     {
+         insertN(cfs, 1, value);
+     }
+ 
+     private void insertN(ColumnFamilyStore cfs, int n, long base) throws Throwable
+     {
+         for (int i = 0; i < n; i++)
+             execute("INSERT INTO %s (pk) VALUES (?)", base + i);
+ 
+         // flush to write the sstable
+         cfs.forceBlockingFlush();
+     }
+ 
+     private void assertDiskSpaceEqual(ColumnFamilyStore cfs)
+     {
+         long liveDiskSpaceUsed = cfs.metric.liveDiskSpaceUsed.getCount();
+         long actual = 0;
+         for (SSTableReader sstable : cfs.getTracker().getView().liveSSTables())
+             actual += sstable.bytesOnDisk();
+ 
+         Assert.assertEquals("bytes on disk does not match current metric liveDiskSpaceUsed", actual, liveDiskSpaceUsed);
+ 
+         // totalDiskSpaceUsed is based off SStable delete, which is async: LogTransaction's tidy enqueues in ScheduledExecutors.nonPeriodicTasks
+         // wait for there to be no more pending sstable releases
+         LifecycleTransaction.waitForDeletions();
+         long totalDiskSpaceUsed = cfs.metric.totalDiskSpaceUsed.getCount();
+         Assert.assertEquals("bytes on disk does not match current metric totalDiskSpaceUsed", actual, totalDiskSpaceUsed);
+     }
+ 
+     private static void indexDownsampleCancelLastSSTable(ColumnFamilyStore cfs)
+     {
+         List<SSTableReader> sstables = Lists.newArrayList(cfs.getSSTables(SSTableSet.CANONICAL));
+         LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
 -        Map<UUID, LifecycleTransaction> txns = ImmutableMap.of(cfs.metadata.cfId, txn);
++        Map<TableId, LifecycleTransaction> txns = ImmutableMap.of(cfs.metadata.id, txn);
+         // fail on the last file (* 3 because we call isStopRequested 3 times for each sstable, and we should fail on the last)
+         AtomicInteger countdown = new AtomicInteger(3 * sstables.size() - 1);
 -        IndexSummaryRedistribution redistribution = new IndexSummaryRedistribution(Collections.emptyList(), txns, 0) {
++        IndexSummaryRedistribution redistribution = new IndexSummaryRedistribution(txns, 0, 0) {
+             public boolean isStopRequested()
+             {
+                 return countdown.decrementAndGet() == 0;
+             }
+         };
+         try
+         {
+             IndexSummaryManager.redistributeSummaries(redistribution);
+             Assert.fail("Should throw CompactionInterruptedException");
+         }
+         catch (CompactionInterruptedException e)
+         {
+             // trying to get this to happen
+         }
+         catch (IOException e)
+         {
+             throw new RuntimeException(e);
+         }
+         finally
+         {
+             try
+             {
+                 FBUtilities.closeAll(txns.values());
+             }
+             catch (Exception e)
+             {
+                 throw new RuntimeException(e);
+             }
+         }
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org