You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2020/08/17 09:43:38 UTC

[cassandra] branch cassandra-3.11 updated (2beebbb -> ecd23f1)

This is an automated email from the ASF dual-hosted git repository.

slebresne pushed a change to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 2beebbb  Merge branch 'cassandra-3.0' into cassandra-3.11
     new e2ecdf2  Remove broken "defragment-on-read" optimization
     new ecd23f1  Merge commit 'e2ecdf268a82fa3ac0f4c9fe77ab35bca33cc72a' into cassandra-3.11

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |  1 +
 .../cassandra/db/SinglePartitionReadCommand.java   | 24 ----------------------
 .../db/compaction/AbstractCompactionStrategy.java  |  5 -----
 .../db/compaction/CompactionStrategyManager.java   |  7 -------
 .../compaction/SizeTieredCompactionStrategy.java   |  6 ------
 5 files changed, 1 insertion(+), 42 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge commit 'e2ecdf268a82fa3ac0f4c9fe77ab35bca33cc72a' into cassandra-3.11

Posted by sl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

slebresne pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit ecd23f1da5894511cccac6c8445f962f3b73f733
Merge: 2beebbb e2ecdf2
Author: Sylvain Lebresne <le...@gmail.com>
AuthorDate: Mon Aug 17 11:32:46 2020 +0200

    Merge commit 'e2ecdf268a82fa3ac0f4c9fe77ab35bca33cc72a' into cassandra-3.11

 CHANGES.txt                                        |  1 +
 .../cassandra/db/SinglePartitionReadCommand.java   | 24 ----------------------
 .../db/compaction/AbstractCompactionStrategy.java  |  5 -----
 .../db/compaction/CompactionStrategyManager.java   |  7 -------
 .../compaction/SizeTieredCompactionStrategy.java   |  6 ------
 5 files changed, 1 insertion(+), 42 deletions(-)

diff --cc CHANGES.txt
index 5168acb,9b4f8c3..a6bc9d9
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,7 -1,5 +1,8 @@@
 -3.0.22:
 +3.11.8
 + * Fix short read protection for GROUP BY queries (CASSANDRA-15459)
 + * Frozen RawTuple is not annotated with frozen in the toString method (CASSANDRA-15857)
 +Merged from 3.0:
+  * Remove broken 'defrag-on-read' optimization (CASSANDRA-15432)
   * Check for endpoint collision with hibernating nodes (CASSANDRA-14599)
   * Operational improvements and hardening for replica filtering protection (CASSANDRA-15907)
   * stop_paranoid disk failure policy is ignored on CorruptSSTableException after node is up (CASSANDRA-15191)
diff --cc src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index a820a89,ca4e8e3..c5de444
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@@ -918,8 -882,7 +918,7 @@@ public class SinglePartitionReadComman
          }
  
          /* add the SSTables on disk */
 -        Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
 +        Collections.sort(view.sstables, SSTableReader.maxTimestampDescending);
-         boolean onlyUnrepaired = true;
          // read sorted sstables
          SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector();
          for (SSTableReader sstable : view.sstables)
@@@ -1012,28 -967,7 +1008,8 @@@
  
          DecoratedKey key = result.partitionKey();
          cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1);
 +        StorageHook.instance.reportRead(cfs.metadata.cfId, partitionKey());
  
-         // "hoist up" the requested data into a more recent sstable
-         if (metricsCollector.getMergedSSTables() > cfs.getMinimumCompactionThreshold()
-             && onlyUnrepaired
-             && !cfs.isAutoCompactionDisabled()
-             && cfs.getCompactionStrategyManager().shouldDefragment())
-         {
-             // !!WARNING!!   if we stop copying our data to a heap-managed object,
-             //               we will need to track the lifetime of this mutation as well
-             Tracing.trace("Defragmenting requested data");
- 
-             try (UnfilteredRowIterator iter = result.unfilteredIterator(columnFilter(), Slices.ALL, false))
-             {
-                 final Mutation mutation = new Mutation(PartitionUpdate.fromIterator(iter, columnFilter()));
-                 StageManager.getStage(Stage.MUTATION).execute(() -> {
-                     // skipping commitlog and index updates is fine since we're just de-fragmenting existing data
-                     Keyspace.open(mutation.getKeyspaceName()).apply(mutation, false, false);
-                 });
-             }
-         }
- 
          return result.unfilteredIterator(columnFilter(), Slices.ALL, clusteringIndexFilter().isReversed());
      }
  
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 86170a1,5d4a254..d486679
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@@ -102,11 -64,8 +102,10 @@@ public class CompactionStrategyManager 
  
          If a user changes the local compaction strategy and then later ALTERs a compaction parameter,
          we will use the new compaction parameters.
 -     */
 -    private CompactionParams schemaCompactionParams;
 +     **/
 +    private volatile CompactionParams schemaCompactionParams;
-     private boolean shouldDefragment;
 +    private boolean supportsEarlyOpen;
 +    private int fanout;
  
      public CompactionStrategyManager(ColumnFamilyStore cfs)
      {
@@@ -206,28 -129,13 +205,27 @@@
  
      private void startup()
      {
 -        for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
 +        writeLock.lock();
 +        try
          {
 -            if (sstable.openReason != SSTableReader.OpenReason.EARLY)
 -                getCompactionStrategyFor(sstable).addSSTable(sstable);
 +            for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
 +            {
 +                if (sstable.openReason != SSTableReader.OpenReason.EARLY)
 +                    compactionStrategyFor(sstable).addSSTable(sstable);
 +            }
 +            repaired.forEach(AbstractCompactionStrategy::startup);
 +            unrepaired.forEach(AbstractCompactionStrategy::startup);
-             shouldDefragment = repaired.get(0).shouldDefragment();
 +            supportsEarlyOpen = repaired.get(0).supportsEarlyOpen();
 +            fanout = (repaired.get(0) instanceof LeveledCompactionStrategy) ? ((LeveledCompactionStrategy) repaired.get(0)).getLevelFanoutSize() : LeveledCompactionStrategy.DEFAULT_LEVEL_FANOUT_SIZE;
          }
 -        repaired.startup();
 -        unrepaired.startup();
 +        finally
 +        {
 +            writeLock.unlock();
 +        }
 +        repaired.forEach(AbstractCompactionStrategy::startup);
 +        unrepaired.forEach(AbstractCompactionStrategy::startup);
 +        if (Stream.concat(repaired.stream(), unrepaired.stream()).anyMatch(cs -> cs.logAll))
 +            compactionLogger.enable();
      }
  
      /**
@@@ -472,124 -235,75 +470,119 @@@
          return res;
      }
  
-     public boolean shouldDefragment()
-     {
-         return shouldDefragment;
-     }
- 
      public Directories getDirectories()
      {
 -        assert repaired.getClass().equals(unrepaired.getClass());
 -        return repaired.getDirectories();
 +        maybeReloadDiskBoundaries();
 +        readLock.lock();
 +        try
 +        {
 +            assert repaired.get(0).getClass().equals(unrepaired.get(0).getClass());
 +            return repaired.get(0).getDirectories();
 +        }
 +        finally
 +        {
 +            readLock.unlock();
 +        }
      }
  
 -    public synchronized void handleNotification(INotification notification, Object sender)
 +    private void handleFlushNotification(Iterable<SSTableReader> added)
      {
 -        if (notification instanceof SSTableAddedNotification)
 +        // If reloaded, SSTables will be placed in their correct locations
 +        // so there is no need to process notification
 +        if (maybeReloadDiskBoundaries())
 +            return;
 +
 +        readLock.lock();
 +        try
          {
 -            SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification;
 -            for (SSTableReader sstable : flushedNotification.added)
 -            {
 -                if (sstable.isRepaired())
 -                    repaired.addSSTable(sstable);
 -                else
 -                    unrepaired.addSSTable(sstable);
 -            }
 +            for (SSTableReader sstable : added)
 +                compactionStrategyFor(sstable).addSSTable(sstable);
          }
 -        else if (notification instanceof SSTableListChangedNotification)
 +        finally
          {
 -            SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification;
 -            Set<SSTableReader> repairedRemoved = new HashSet<>();
 -            Set<SSTableReader> repairedAdded = new HashSet<>();
 -            Set<SSTableReader> unrepairedRemoved = new HashSet<>();
 -            Set<SSTableReader> unrepairedAdded = new HashSet<>();
 +            readLock.unlock();
 +        }
 +    }
 +
 +    private void handleListChangedNotification(Iterable<SSTableReader> added, Iterable<SSTableReader> removed)
 +    {
 +        // If reloaded, SSTables will be placed in their correct locations
 +        // so there is no need to process notification
 +        if (maybeReloadDiskBoundaries())
 +            return;
 +
 +        readLock.lock();
 +        try
 +        {
 +            // a bit of gymnastics to be able to replace sstables in compaction strategies
 +            // we use this to know that a compaction finished and where to start the next compaction in LCS
 +            int locationSize = partitionSSTablesByTokenRange? currentBoundaries.directories.size() : 1;
  
 -            for (SSTableReader sstable : listChangedNotification.removed)
 +            List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize);
 +            List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize);
 +            List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize);
 +            List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize);
 +
 +            for (int i = 0; i < locationSize; i++)
              {
 -                if (sstable.isRepaired())
 -                    repairedRemoved.add(sstable);
 -                else
 -                    unrepairedRemoved.add(sstable);
 +                repairedRemoved.add(new HashSet<>());
 +                repairedAdded.add(new HashSet<>());
 +                unrepairedRemoved.add(new HashSet<>());
 +                unrepairedAdded.add(new HashSet<>());
              }
 -            for (SSTableReader sstable : listChangedNotification.added)
 +
 +            for (SSTableReader sstable : removed)
              {
 +                int i = compactionStrategyIndexFor(sstable);
                  if (sstable.isRepaired())
 -                    repairedAdded.add(sstable);
 +                    repairedRemoved.get(i).add(sstable);
                  else
 -                    unrepairedAdded.add(sstable);
 +                    unrepairedRemoved.get(i).add(sstable);
              }
 -            if (!repairedRemoved.isEmpty())
 +            for (SSTableReader sstable : added)
              {
 -                repaired.replaceSSTables(repairedRemoved, repairedAdded);
 +                int i = compactionStrategyIndexFor(sstable);
 +                if (sstable.isRepaired())
 +                    repairedAdded.get(i).add(sstable);
 +                else
 +                    unrepairedAdded.get(i).add(sstable);
              }
 -            else
 +            for (int i = 0; i < locationSize; i++)
              {
 -                for (SSTableReader sstable : repairedAdded)
 -                    repaired.addSSTable(sstable);
 -            }
 +                if (!repairedRemoved.get(i).isEmpty())
 +                    repaired.get(i).replaceSSTables(repairedRemoved.get(i), repairedAdded.get(i));
 +                else
 +                    repaired.get(i).addSSTables(repairedAdded.get(i));
  
 -            if (!unrepairedRemoved.isEmpty())
 -            {
 -                unrepaired.replaceSSTables(unrepairedRemoved, unrepairedAdded);
 -            }
 -            else
 -            {
 -                for (SSTableReader sstable : unrepairedAdded)
 -                    unrepaired.addSSTable(sstable);
 +                if (!unrepairedRemoved.get(i).isEmpty())
 +                    unrepaired.get(i).replaceSSTables(unrepairedRemoved.get(i), unrepairedAdded.get(i));
 +                else
 +                    unrepaired.get(i).addSSTables(unrepairedAdded.get(i));
              }
          }
 -        else if (notification instanceof SSTableRepairStatusChanged)
 +        finally
          {
 -            for (SSTableReader sstable : ((SSTableRepairStatusChanged) notification).sstable)
 +            readLock.unlock();
 +        }
 +    }
 +
 +    private void handleRepairStatusChangedNotification(Iterable<SSTableReader> sstables)
 +    {
 +        // If reloaded, SSTables will be placed in their correct locations
 +        // so there is no need to process notification
 +        if (maybeReloadDiskBoundaries())
 +            return;
 +        // we need a write lock here since we move sstables from one strategy instance to another
 +        readLock.lock();
 +        try
 +        {
 +            for (SSTableReader sstable : sstables)
              {
 +                int index = compactionStrategyIndexFor(sstable);
                  if (sstable.isRepaired())
                  {
 -                    unrepaired.removeSSTable(sstable);
 -                    repaired.addSSTable(sstable);
 +                    unrepaired.get(index).removeSSTable(sstable);
 +                    repaired.get(index).addSSTable(sstable);
                  }
                  else
                  {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org