You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2022/02/17 09:49:07 UTC

[cassandra] branch cassandra-3.11 updated (f0bb299 -> e53ad64)

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a change to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from f0bb299  Merge branch 'cassandra-3.0' into cassandra-3.11
     new b58a5c8  LeveledCompactionStrategy disk space check improvements
     new e53ad64  Merge branch 'cassandra-3.0' into cassandra-3.11

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   1 +
 .../cassandra/db/compaction/CompactionTask.java    |   6 +-
 .../db/compaction/LeveledCompactionTask.java       |  45 +++++-
 .../compaction/writers/CompactionAwareWriter.java  |   7 +-
 .../writers/MajorLeveledCompactionWriter.java      |   6 +
 .../compaction/writers/MaxSSTableSizeWriter.java   |   6 +
 .../SplittingSizeTieredCompactionWriter.java       |   8 +-
 .../compaction/LeveledCompactionStrategyTest.java  | 152 +++++++++++++++++++++
 .../{ => writers}/CompactionAwareWriterTest.java   |  48 ++++++-
 9 files changed, 267 insertions(+), 12 deletions(-)
 rename test/unit/org/apache/cassandra/db/compaction/{ => writers}/CompactionAwareWriterTest.java (84%)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit e53ad6461224b1ab096f56d9e2c744126eb532cd
Merge: f0bb299 b58a5c8
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Thu Feb 17 10:30:05 2022 +0100

    Merge branch 'cassandra-3.0' into cassandra-3.11

 CHANGES.txt                                        |   1 +
 .../cassandra/db/compaction/CompactionTask.java    |   6 +-
 .../db/compaction/LeveledCompactionTask.java       |  45 +++++-
 .../compaction/writers/CompactionAwareWriter.java  |   7 +-
 .../writers/MajorLeveledCompactionWriter.java      |   6 +
 .../compaction/writers/MaxSSTableSizeWriter.java   |   6 +
 .../SplittingSizeTieredCompactionWriter.java       |   8 +-
 .../compaction/LeveledCompactionStrategyTest.java  | 152 +++++++++++++++++++++
 .../{ => writers}/CompactionAwareWriterTest.java   |  48 ++++++-
 9 files changed, 267 insertions(+), 12 deletions(-)

diff --cc CHANGES.txt
index 402048f,4d07a3d..513a8af
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,21 -1,9 +1,22 @@@
 -3.0.27
 - * LeveledCompactionStrategy disk space check improvements (CASSANDRA-17272)
 +3.11.13
 +Merged from 3.0:
   * Lazy transaction log replica creation allows incorrect replica content divergence during anticompaction (CASSANDRA-17273)
++ * LeveledCompactionStrategy disk space check improvements (CASSANDRA-17272)
  
  
 -3.0.26
 +3.11.12
 + * Upgrade snakeyaml to 1.26 in 3.11 (CASSANDRA=17028)
 + * Add key validation to ssstablescrub (CASSANDRA-16969)
 + * Update Jackson from 2.9.10 to 2.12.5 (CASSANDRA-16851)
 + * Include SASI components to snapshots (CASSANDRA-15134)
 + * Make assassinate more resilient to missing tokens (CASSANDRA-16847)
 + * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies (CASSANDRA-16854)
 + * Validate SASI tokenizer options before adding index to schema (CASSANDRA-15135)
 + * Fixup scrub output when no data post-scrub and clear up old use of row, which really means partition (CASSANDRA-16835)
 + * Fix ant-junit dependency issue (CASSANDRA-16827)
 + * Reduce thread contention in CommitLogSegment and HintsBuffer (CASSANDRA-16072)
 + * Avoid sending CDC column if not enabled (CASSANDRA-16770)
 +Merged from 3.0:
   * Fix conversion from megabits to bytes in streaming rate limiter (CASSANDRA-17243)
   * Upgrade logback to 1.2.9 (CASSANDRA-17204)
   * Avoid race in AbstractReplicationStrategy endpoint caching (CASSANDRA-16673)
diff --cc src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index bc6115e,1ceed1c..7ffe33a
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@@ -158,26 -124,9 +158,26 @@@ public abstract class CompactionAwareWr
       */
      protected void maybeSwitchWriter(DecoratedKey key)
      {
 -        if (!isInitialized)
 -            switchCompactionLocation(getDirectories().getWriteableLocation(getExpectedWriteSize()));
 -        isInitialized = true;
 +        if (diskBoundaries == null)
 +        {
 +            if (locationIndex < 0)
 +            {
-                 Directories.DataDirectory defaultLocation = getWriteDirectory(nonExpiredSSTables, cfs.getExpectedCompactedFileSize(nonExpiredSSTables, OperationType.UNKNOWN));
++                Directories.DataDirectory defaultLocation = getWriteDirectory(nonExpiredSSTables, getExpectedWriteSize());
 +                switchCompactionLocation(defaultLocation);
 +                locationIndex = 0;
 +            }
 +            return;
 +        }
 +
 +        if (locationIndex > -1 && key.compareTo(diskBoundaries.get(locationIndex)) < 0)
 +            return;
 +
 +        int prevIdx = locationIndex;
 +        while (locationIndex == -1 || key.compareTo(diskBoundaries.get(locationIndex)) > 0)
 +            locationIndex++;
 +        if (prevIdx >= 0)
 +            logger.debug("Switching write location from {} to {}", locations.get(prevIdx), locations.get(locationIndex));
 +        switchCompactionLocation(locations.get(locationIndex));
      }
  
      /**
@@@ -198,45 -147,18 +198,50 @@@
  
      /**
       * Return a directory where we can expect expectedWriteSize to fit.
 +     *
 +     * @param sstables the sstables to compact
 +     * @return
       */
 -    public Directories.DataDirectory getWriteDirectory(long expectedWriteSize)
 +    public Directories.DataDirectory getWriteDirectory(Iterable<SSTableReader> sstables, long estimatedWriteSize)
      {
 -        Directories.DataDirectory directory = getDirectories().getWriteableLocation(expectedWriteSize);
 -        if (directory == null)
 -            throw new RuntimeException("Insufficient disk space to write " + expectedWriteSize + " bytes");
 +        Descriptor descriptor = null;
 +        for (SSTableReader sstable : sstables)
 +        {
 +            if (descriptor == null)
 +                descriptor = sstable.descriptor;
 +            if (!descriptor.directory.equals(sstable.descriptor.directory))
 +            {
 +                logger.trace("All sstables not from the same disk - putting results in {}", descriptor.directory);
 +                break;
 +            }
 +        }
 +        Directories.DataDirectory d = getDirectories().getDataDirectoryForFile(descriptor);
 +        if (d != null)
 +        {
 +            long availableSpace = d.getAvailableSpace();
 +            if (availableSpace < estimatedWriteSize)
 +                throw new RuntimeException(String.format("Not enough space to write %s to %s (%s available)",
 +                                                         FBUtilities.prettyPrintMemory(estimatedWriteSize),
 +                                                         d.location,
 +                                                         FBUtilities.prettyPrintMemory(availableSpace)));
 +            logger.trace("putting compaction results in {}", descriptor.directory);
 +            return d;
 +        }
 +        d = getDirectories().getWriteableLocation(estimatedWriteSize);
 +        if (d == null)
 +            throw new RuntimeException(String.format("Not enough disk space to store %s",
 +                                                     FBUtilities.prettyPrintMemory(estimatedWriteSize)));
 +        return d;
 +    }
  
 -        return directory;
 +    public CompactionAwareWriter setRepairedAt(long repairedAt)
 +    {
 +        this.sstableWriter.setRepairedAt(repairedAt);
 +        return this;
      }
+ 
+     protected long getExpectedWriteSize()
+     {
+         return cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType());
+     }
  }
diff --cc src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index 0beb505,3eee398..f1326e9
--- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@@ -100,21 -96,23 +100,27 @@@ public class MajorLeveledCompactionWrit
  
      }
  
 -    public void switchCompactionLocation(Directories.DataDirectory directory)
 +    @Override
 +    public void switchCompactionLocation(Directories.DataDirectory location)
      {
 -        File sstableDirectory = getDirectories().getLocationForDisk(directory);
 -        @SuppressWarnings("resource")
 -        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
 -                                                    averageEstimatedKeysPerSSTable,
 -                                                    minRepairedAt,
 -                                                    cfs.metadata,
 -                                                    new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel),
 -                                                    SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
 -                                                    txn);
 -        sstableWriter.switchWriter(writer);
 +        this.sstableDirectory = location;
 +        averageEstimatedKeysPerSSTable = Math.round(((double) averageEstimatedKeysPerSSTable * sstablesWritten + partitionsWritten) / (sstablesWritten + 1));
 +        sstableWriter.switchWriter(SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(sstableDirectory))),
 +                keysPerSSTable,
 +                minRepairedAt,
 +                cfs.metadata,
 +                new MetadataCollector(txn.originals(), cfs.metadata.comparator, currentLevel),
 +                SerializationHeader.make(cfs.metadata, txn.originals()),
 +                cfs.indexManager.listIndexes(),
 +                txn));
 +        partitionsWritten = 0;
 +        sstablesWritten = 0;
 +
      }
+ 
+     @Override
+     protected long getExpectedWriteSize()
+     {
 -        return expectedWriteSize;
++        return Math.min(maxSSTableSize, super.getExpectedWriteSize());
+     }
  }
diff --cc src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index 864185e,d76381a..36c69b0
--- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@@ -118,5 -103,12 +118,11 @@@ public class MaxSSTableSizeWriter exten
                                                      txn);
  
          sstableWriter.switchWriter(writer);
 -
      }
+ 
+     @Override
+     protected long getExpectedWriteSize()
+     {
 -        return expectedWriteSize;
++        return Math.min(maxSSTableSize, super.getExpectedWriteSize());
+     }
  }
diff --cc src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index 46cb891,77672d8..b4d7097
--- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@@ -88,12 -92,11 +88,12 @@@ public class SplittingSizeTieredCompact
      public boolean realAppend(UnfilteredRowIterator partition)
      {
          RowIndexEntry rie = sstableWriter.append(partition);
 -        if (sstableWriter.currentWriter().getOnDiskFilePointer() > currentBytesToWrite && currentRatioIndex < ratios.length - 1) // if we underestimate how many keys we have, the last sstable might get more than we expect
 +        if (sstableWriter.currentWriter().getEstimatedOnDiskBytesWritten() > currentBytesToWrite && currentRatioIndex < ratios.length - 1) // if we underestimate how many keys we have, the last sstable might get more than we expect
          {
              currentRatioIndex++;
-             currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]);
+             currentBytesToWrite = getExpectedWriteSize();
 -            switchCompactionLocation(getWriteDirectory(currentBytesToWrite));
 +            switchCompactionLocation(location);
 +            logger.debug("Switching writer, currentBytesToWrite = {}", currentBytesToWrite);
          }
          return rie != null;
      }
@@@ -114,5 -114,11 +114,11 @@@
                                                      txn);
          logger.trace("Switching writer, currentPartitionsToWrite = {}", currentPartitionsToWrite);
          sstableWriter.switchWriter(writer);
 -
      }
+ 
++    @Override
+     protected long getExpectedWriteSize()
+     {
+         return Math.round(totalSize * ratios[currentRatioIndex]);
+     }
  }
diff --cc test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index eba243f,5bbc931..91b9b3b
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@@ -28,16 -29,17 +29,21 @@@ import java.util.Iterator
  import java.util.List;
  import java.util.Map;
  import java.util.Random;
+ import java.util.Set;
  import java.util.UUID;
 +import java.util.stream.Collectors;
  
 -import com.google.common.collect.Iterables;
 -import com.google.common.collect.Sets;
 +import junit.framework.Assert;
  import org.junit.After;
 -import org.junit.Assert;
  import org.junit.Before;
  import org.junit.BeforeClass;
++
++import com.google.common.collect.Iterables;
++import com.google.common.collect.Sets;
++import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
  import org.junit.Test;
  import org.junit.runner.RunWith;
 +
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -62,11 -64,12 +68,14 @@@ import org.apache.cassandra.schema.Comp
  import org.apache.cassandra.schema.KeyspaceParams;
  import org.apache.cassandra.service.ActiveRepairService;
  import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.Pair;
  
 +import static java.util.Collections.singleton;
 +import static org.assertj.core.api.Assertions.assertThat;
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertNotNull;
+ import static org.junit.Assert.assertNull;
  import static org.junit.Assert.assertTrue;
  
  @RunWith(OrderedJUnit4ClassRunner.class)
@@@ -376,351 -380,146 +385,494 @@@ public class LeveledCompactionStrategyT
          assertFalse(repaired.manifest.getLevel(1).contains(sstable2));
      }
  
 +
 +
 +    @Test
 +    public void testTokenRangeCompaction() throws Exception
 +    {
 +        // Remove any existing data so we can start out clean with predictable number of sstables
 +        cfs.truncateBlocking();
 +
 +        // Disable auto compaction so cassandra does not compact
 +        CompactionManager.instance.disableAutoCompaction();
 +
 +        ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB value, make it easy to have multiple files
 +
 +        DecoratedKey key1 = Util.dk(String.valueOf(1));
 +        DecoratedKey key2 = Util.dk(String.valueOf(2));
 +        List<DecoratedKey> keys = new ArrayList<>(Arrays.asList(key1, key2));
 +        int numIterations = 10;
 +        int columns = 2;
 +
 +        // Add enough data to trigger multiple sstables.
 +
 +        // create 10 sstables that contain data for both key1 and key2
 +        for (int i = 0; i < numIterations; i++) {
 +            for (DecoratedKey key : keys) {
 +                UpdateBuilder update = UpdateBuilder.create(cfs.metadata, key);
 +                for (int c = 0; c < columns; c++)
 +                    update.newRow("column" + c).add("val", value);
 +                update.applyUnsafe();
 +            }
 +            cfs.forceBlockingFlush();
 +        }
 +
 +        // create 20 more sstables with 10 containing data for key1 and other 10 containing data for key2
 +        for (int i = 0; i < numIterations; i++) {
 +            for (DecoratedKey key : keys) {
 +                UpdateBuilder update = UpdateBuilder.create(cfs.metadata, key);
 +                for (int c = 0; c < columns; c++)
 +                    update.newRow("column" + c).add("val", value);
 +                update.applyUnsafe();
 +                cfs.forceBlockingFlush();
 +            }
 +        }
 +
 +        // We should have a total of 30 sstables by now
 +        assertEquals(30, cfs.getLiveSSTables().size());
 +
 +        // Compact just the tables with key2
 +        // Bit hackish to use the key1.token as the prior key but works in BytesToken
 +        Range<Token> tokenRange = new Range<>(key2.getToken(), key2.getToken());
 +        Collection<Range<Token>> tokenRanges = new ArrayList<>(Arrays.asList(tokenRange));
 +        cfs.forceCompactionForTokenRange(tokenRanges);
 +
 +        while(CompactionManager.instance.isCompacting(Arrays.asList(cfs))) {
 +            Thread.sleep(100);
 +        }
 +
 +        // 20 tables that have key2 should have been compacted in to 1 table resulting in 11 (30-20+1)
 +        assertEquals(11, cfs.getLiveSSTables().size());
 +
 +        // Compact just the tables with key1. At this point all 11 tables should have key1
 +        Range<Token> tokenRange2 = new Range<>(key1.getToken(), key1.getToken());
 +        Collection<Range<Token>> tokenRanges2 = new ArrayList<>(Arrays.asList(tokenRange2));
 +        cfs.forceCompactionForTokenRange(tokenRanges2);
 +
 +
 +        while(CompactionManager.instance.isCompacting(Arrays.asList(cfs))) {
 +            Thread.sleep(100);
 +        }
 +
 +        // the 11 tables containing key1 should all compact to 1 table
 +        assertEquals(1, cfs.getLiveSSTables().size());
 +    }
 +
 +    @Test
 +    public void testCompactionCandidateOrdering() throws Exception
 +    {
 +        // add some data
 +        byte [] b = new byte[100 * 1024];
 +        new Random().nextBytes(b);
 +        ByteBuffer value = ByteBuffer.wrap(b);
 +        int rows = 4;
 +        int columns = 10;
 +        // Just keep sstables in L0 for this test
 +        cfs.disableAutoCompaction();
 +        for (int r = 0; r < rows; r++)
 +        {
 +            UpdateBuilder update = UpdateBuilder.create(cfs.metadata, String.valueOf(r));
 +            for (int c = 0; c < columns; c++)
 +                update.newRow("column" + c).add("val", value);
 +            update.applyUnsafe();
 +            cfs.forceBlockingFlush();
 +        }
 +        LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) (cfs.getCompactionStrategyManager()).getStrategies().get(1).get(0);
 +        // get readers for level 0 sstables
 +        Collection<SSTableReader> sstables = strategy.manifest.getLevel(0);
 +        Collection<SSTableReader> sortedCandidates = strategy.manifest.ageSortedSSTables(sstables);
 +        assertTrue(String.format("More than 1 sstable required for test, found: %d .", sortedCandidates.size()), sortedCandidates.size() > 1);
 +        long lastMaxTimeStamp = Long.MIN_VALUE;
 +        for (SSTableReader sstable : sortedCandidates)
 +        {
 +            assertTrue(String.format("SStables not sorted into oldest to newest by maxTimestamp. Current sstable: %d , last sstable: %d", sstable.getMaxTimestamp(), lastMaxTimeStamp),
 +                       sstable.getMaxTimestamp() > lastMaxTimeStamp);
 +            lastMaxTimeStamp = sstable.getMaxTimestamp();
 +        }
 +    }
 +
 +    @Test
 +    public void testAddingOverlapping()
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS();
 +        LeveledManifest lm = new LeveledManifest(cfs, 10, 10, new SizeTieredCompactionStrategyOptions());
 +        List<SSTableReader> currentLevel = new ArrayList<>();
 +        int gen = 1;
 +        currentLevel.add(MockSchema.sstableWithLevel(gen++, 10, 20, 1, cfs));
 +        currentLevel.add(MockSchema.sstableWithLevel(gen++, 21, 30, 1, cfs));
 +        currentLevel.add(MockSchema.sstableWithLevel(gen++, 51, 100, 1, cfs));
 +        currentLevel.add(MockSchema.sstableWithLevel(gen++, 80, 120, 1, cfs));
 +        currentLevel.add(MockSchema.sstableWithLevel(gen++, 90, 150, 1, cfs));
 +
 +        lm.addSSTables(currentLevel);
 +        assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3));
 +        assertLevelsEqual(lm.getLevel(0), currentLevel.subList(3, 5));
 +
 +        List<SSTableReader> newSSTables = new ArrayList<>();
 +        // this sstable last token is the same as the first token of L1 above, should get sent to L0:
 +        newSSTables.add(MockSchema.sstableWithLevel(gen++, 5, 10, 1, cfs));
 +        lm.addSSTables(newSSTables);
 +        assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3));
 +        assertEquals(0, newSSTables.get(0).getSSTableLevel());
 +        assertTrue(lm.getLevel(0).containsAll(newSSTables));
 +
 +        newSSTables.clear();
 +        newSSTables.add(MockSchema.sstableWithLevel(gen++, 30, 40, 1, cfs));
 +        lm.addSSTables(newSSTables);
 +        assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3));
 +        assertEquals(0, newSSTables.get(0).getSSTableLevel());
 +        assertTrue(lm.getLevel(0).containsAll(newSSTables));
 +
 +        newSSTables.clear();
 +        newSSTables.add(MockSchema.sstableWithLevel(gen++, 100, 140, 1, cfs));
 +        lm.addSSTables(newSSTables);
 +        assertLevelsEqual(lm.getLevel(1), currentLevel.subList(0, 3));
 +        assertEquals(0, newSSTables.get(0).getSSTableLevel());
 +        assertTrue(lm.getLevel(0).containsAll(newSSTables));
 +
 +        newSSTables.clear();
 +        newSSTables.add(MockSchema.sstableWithLevel(gen++, 100, 140, 1, cfs));
 +        newSSTables.add(MockSchema.sstableWithLevel(gen++, 120, 140, 1, cfs));
 +        lm.addSSTables(newSSTables);
 +        List<SSTableReader> newL1 = new ArrayList<>(currentLevel.subList(0, 3));
 +        newL1.add(newSSTables.get(1));
 +        assertLevelsEqual(lm.getLevel(1), newL1);
 +        newSSTables.remove(1);
 +        assertTrue(newSSTables.stream().allMatch(s -> s.getSSTableLevel() == 0));
 +        assertTrue(lm.getLevel(0).containsAll(newSSTables));
 +    }
 +
 +    @Test
 +    public void singleTokenSSTableTest()
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS();
 +        LeveledManifest lm = new LeveledManifest(cfs, 10, 10, new SizeTieredCompactionStrategyOptions());
 +        List<SSTableReader> expectedL1 = new ArrayList<>();
 +
 +        int gen = 1;
 +        // single sstable, single token (100)
 +        expectedL1.add(MockSchema.sstableWithLevel(gen++, 100, 100, 1, cfs));
 +        lm.addSSTables(expectedL1);
 +
 +        List<SSTableReader> expectedL0 = new ArrayList<>();
 +
 +        // should get moved to L0:
 +        expectedL0.add(MockSchema.sstableWithLevel(gen++, 99, 101, 1, cfs));
 +        expectedL0.add(MockSchema.sstableWithLevel(gen++, 100, 101, 1, cfs));
 +        expectedL0.add(MockSchema.sstableWithLevel(gen++, 99, 100, 1, cfs));
 +        expectedL0.add(MockSchema.sstableWithLevel(gen++, 100, 100, 1, cfs));
 +        lm.addSSTables(expectedL0);
 +
 +        assertLevelsEqual(expectedL0, lm.getLevel(0));
 +        assertTrue(expectedL0.stream().allMatch(s -> s.getSSTableLevel() == 0));
 +        assertLevelsEqual(expectedL1, lm.getLevel(1));
 +        assertTrue(expectedL1.stream().allMatch(s -> s.getSSTableLevel() == 1));
 +
 +        // should work:
 +        expectedL1.add(MockSchema.sstableWithLevel(gen++, 98, 99, 1, cfs));
 +        expectedL1.add(MockSchema.sstableWithLevel(gen++, 101, 101, 1, cfs));
 +        lm.addSSTables(expectedL1.subList(1, expectedL1.size()));
 +        assertLevelsEqual(expectedL1, lm.getLevel(1));
 +    }
 +
 +    @Test
 +    public void randomMultiLevelAddTest()
 +    {
 +        int iterations = 100;
 +        int levelCount = 9;
 +
 +        ColumnFamilyStore cfs = MockSchema.newCFS();
 +        LeveledManifest lm = new LeveledManifest(cfs, 10, 10, new SizeTieredCompactionStrategyOptions());
 +        long seed = System.currentTimeMillis();
 +        Random r = new Random(seed);
 +        List<SSTableReader> newLevels = generateNewRandomLevels(cfs, 40, levelCount, 0, r);
 +
 +        int sstableCount = newLevels.size();
 +        lm.addSSTables(newLevels);
 +
 +        int [] expectedLevelSizes = lm.getAllLevelSize();
 +
 +        for (int j = 0; j < iterations; j++)
 +        {
 +            newLevels = generateNewRandomLevels(cfs, 20, levelCount, sstableCount, r);
 +            sstableCount += newLevels.size();
 +
 +            int[] canAdd = canAdd(lm, newLevels, levelCount);
 +            for (int i = 0; i < levelCount; i++)
 +                expectedLevelSizes[i] += canAdd[i];
 +            lm.addSSTables(newLevels);
 +        }
 +
 +        // and verify no levels overlap
 +        int actualSSTableCount = 0;
 +        for (int i = 0; i < levelCount; i++)
 +        {
 +            actualSSTableCount += lm.getLevelSize(i);
 +            List<SSTableReader> level = new ArrayList<>(lm.getLevel(i));
 +            int lvl = i;
 +            assertTrue(level.stream().allMatch(s -> s.getSSTableLevel() == lvl));
 +            if (i > 0)
 +            {
 +                level.sort(SSTableReader.sstableComparator);
 +                SSTableReader prev = null;
 +                for (SSTableReader sstable : level)
 +                {
 +                    if (prev != null && sstable.first.compareTo(prev.last) <= 0)
 +                    {
 +                        String levelStr = level.stream().map(s -> String.format("[%s, %s]", s.first, s.last)).collect(Collectors.joining(", "));
 +                        String overlap = String.format("sstable [%s, %s] overlaps with [%s, %s] in level %d (%s) ", sstable.first, sstable.last, prev.first, prev.last, i, levelStr);
 +                        Assert.fail("[seed = "+seed+"] overlap in level "+lvl+": " + overlap);
 +                    }
 +                    prev = sstable;
 +                }
 +            }
 +        }
 +        assertEquals(sstableCount, actualSSTableCount);
 +        for (int i = 0; i < levelCount; i++)
 +            assertEquals("[seed = " + seed + "] wrong sstable count in level = " + i, expectedLevelSizes[i], lm.getLevel(i).size());
 +    }
 +
 +    private static List<SSTableReader> generateNewRandomLevels(ColumnFamilyStore cfs, int maxSSTableCountPerLevel, int levelCount, int startGen, Random r)
 +    {
 +        List<SSTableReader> newLevels = new ArrayList<>();
 +        for (int level = 0; level < levelCount; level++)
 +        {
 +            int numLevelSSTables = r.nextInt(maxSSTableCountPerLevel) + 1;
 +            List<Integer> tokens = new ArrayList<>(numLevelSSTables * 2);
 +
 +            for (int i = 0; i < numLevelSSTables * 2; i++)
 +                tokens.add(r.nextInt(4000));
 +            Collections.sort(tokens);
 +            for (int i = 0; i < tokens.size() - 1; i += 2)
 +            {
 +                SSTableReader sstable = MockSchema.sstableWithLevel(++startGen, tokens.get(i), tokens.get(i + 1), level, cfs);
 +                newLevels.add(sstable);
 +            }
 +        }
 +        return newLevels;
 +    }
 +
 +    /**
 +     * brute-force checks if the new sstables can be added to the correct level in manifest
 +     *
 +     * @return count of expected sstables to add to each level
 +     */
 +    private static int[] canAdd(LeveledManifest lm, List<SSTableReader> newSSTables, int levelCount)
 +    {
 +        Map<Integer, Collection<SSTableReader>> sstableGroups = new HashMap<>();
 +        newSSTables.forEach(s -> sstableGroups.computeIfAbsent(s.getSSTableLevel(), k -> new ArrayList<>()).add(s));
 +
 +        int[] canAdd = new int[levelCount];
 +        for (Map.Entry<Integer, Collection<SSTableReader>> lvlGroup : sstableGroups.entrySet())
 +        {
 +            int level = lvlGroup.getKey();
 +            if (level == 0)
 +            {
 +                canAdd[0] += lvlGroup.getValue().size();
 +                continue;
 +            }
 +
 +            List<SSTableReader> newLevel = new ArrayList<>(lm.getLevel(level));
 +            for (SSTableReader sstable : lvlGroup.getValue())
 +            {
 +                newLevel.add(sstable);
 +                newLevel.sort(SSTableReader.sstableComparator);
 +
 +                SSTableReader prev = null;
 +                boolean kept = true;
 +                for (SSTableReader sst : newLevel)
 +                {
 +                    if (prev != null && prev.last.compareTo(sst.first) >= 0)
 +                    {
 +                        newLevel.remove(sstable);
 +                        kept = false;
 +                        break;
 +                    }
 +                    prev = sst;
 +                }
 +                if (kept)
 +                    canAdd[level] += 1;
 +                else
 +                    canAdd[0] += 1;
 +            }
 +        }
 +        return canAdd;
 +    }
 +
 +    private static void assertLevelsEqual(Collection<SSTableReader> l1, Collection<SSTableReader> l2)
 +    {
 +        assertEquals(l1.size(), l2.size());
 +        assertEquals(new HashSet<>(l1), new HashSet<>(l2));
 +    }
 +
 +    @Test
 +    public void testHighestLevelHasMoreDataThanSupported()
 +    {
 +        ColumnFamilyStore cfs = MockSchema.newCFS();
 +        int fanoutSize = 2; // to generate less sstables
 +        LeveledManifest lm = new LeveledManifest(cfs, 1, fanoutSize, new SizeTieredCompactionStrategyOptions());
 +
 +        // generate data for L7 to trigger compaction
 +        int l7 = 7;
 +        int maxBytesForL7 = (int) (Math.pow(fanoutSize, l7) * 1024 * 1024);
 +        int sstablesSizeForL7 = (int) (maxBytesForL7 * 1.001) + 1;
 +        List<SSTableReader> sstablesOnL7 = Collections.singletonList(MockSchema.sstableWithLevel( 1, sstablesSizeForL7, l7, cfs));
 +        lm.addSSTables(sstablesOnL7);
 +
 +        // generate data for L8 to trigger compaction
 +        int l8 = 8;
 +        int maxBytesForL8 = (int) (Math.pow(fanoutSize, l8) * 1024 * 1024);
 +        int sstablesSizeForL8 = (int) (maxBytesForL8 * 1.001) + 1;
 +        List<SSTableReader> sstablesOnL8 = Collections.singletonList(MockSchema.sstableWithLevel( 2, sstablesSizeForL8, l8, cfs));
 +        lm.addSSTables(sstablesOnL8);
 +
 +        // compaction for L8 sstables is not supposed to be run because there is no upper level to promote sstables
 +        // that's why we expect compaction candidates for L7 only
 +        Collection<SSTableReader> compactionCandidates = lm.getCompactionCandidates().sstables;
 +        assertThat(compactionCandidates).containsAll(sstablesOnL7);
 +        assertThat(compactionCandidates).doesNotContainAnyElementsOf(sstablesOnL8);
 +    }
++
+     @Test
+     public void testReduceScopeL0L1() throws IOException
+     {
+         ColumnFamilyStore cfs = MockSchema.newCFS();
+         Map<String, String> localOptions = new HashMap<>();
+         localOptions.put("class", "LeveledCompactionStrategy");
+         localOptions.put("sstable_size_in_mb", "1");
+         cfs.setCompactionParameters(localOptions);
+         List<SSTableReader> l1sstables = new ArrayList<>();
+         for (int i = 0; i < 10; i++)
+         {
+             SSTableReader l1sstable = MockSchema.sstable(i, 1 * 1024 * 1024, cfs);
+             l1sstable.descriptor.getMetadataSerializer().mutateLevel(l1sstable.descriptor, 1);
+             l1sstable.reloadSSTableMetadata();
+             l1sstables.add(l1sstable);
+         }
+         List<SSTableReader> l0sstables = new ArrayList<>();
+         for (int i = 10; i < 20; i++)
+             l0sstables.add(MockSchema.sstable(i, (i + 1) * 1024 * 1024, cfs));
 -
+         try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.COMPACTION, Iterables.concat(l0sstables, l1sstables)))
+         {
+             CompactionTask task = new LeveledCompactionTask(cfs, txn, 1, 0, 1024*1024, false);
+             SSTableReader lastRemoved = null;
+             boolean removed = true;
+             for (int i = 0; i < l0sstables.size(); i++)
+             {
+                 Set<SSTableReader> before = new HashSet<>(txn.originals());
+                 removed = task.reduceScopeForLimitedSpace(0);
 -                SSTableReader removedSSTable = Sets.difference(before, txn.originals()).stream().findFirst().orElse(null);
++                SSTableReader removedSSTable = Iterables.getOnlyElement(Sets.difference(before, txn.originals()), null);
+                 if (removed)
+                 {
+                     assertNotNull(removedSSTable);
+                     assertTrue(lastRemoved == null || removedSSTable.onDiskLength() < lastRemoved.onDiskLength());
+                     assertEquals(0, removedSSTable.getSSTableLevel());
+                     Pair<Set<SSTableReader>, Set<SSTableReader>> sstables = groupByLevel(txn.originals());
+                     Set<SSTableReader> l1after = sstables.right;
+ 
+                     assertEquals(l1after, new HashSet<>(l1sstables)); // we don't touch L1
+                     assertEquals(before.size() - 1, txn.originals().size());
+                     lastRemoved = removedSSTable;
+                 }
+                 else
+                 {
+                     assertNull(removedSSTable);
+                     Pair<Set<SSTableReader>, Set<SSTableReader>> sstables = groupByLevel(txn.originals());
+                     Set<SSTableReader> l0after = sstables.left;
+                     Set<SSTableReader> l1after = sstables.right;
+                     assertEquals(l1after, new HashSet<>(l1sstables)); // we don't touch L1
+                     assertEquals(1, l0after.size()); // and we stop reducing once there is a single sstable left
+                 }
+             }
+             assertFalse(removed);
+         }
+     }
+ 
+     @Test
+     public void testReduceScopeL0()
+     {
+ 
+         List<SSTableReader> l0sstables = new ArrayList<>();
+         for (int i = 10; i < 20; i++)
+             l0sstables.add(MockSchema.sstable(i, (i + 1) * 1024 * 1024, cfs));
+ 
+         try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.COMPACTION, l0sstables))
+         {
+             CompactionTask task = new LeveledCompactionTask(cfs, txn, 0, 0, 1024*1024, false);
+ 
+             SSTableReader lastRemoved = null;
+             boolean removed = true;
+             for (int i = 0; i < l0sstables.size(); i++)
+             {
+                 Set<SSTableReader> before = new HashSet<>(txn.originals());
+                 removed = task.reduceScopeForLimitedSpace(0);
+                 SSTableReader removedSSTable = Sets.difference(before, txn.originals()).stream().findFirst().orElse(null);
+                 if (removed)
+                 {
+                     assertNotNull(removedSSTable);
+                     assertTrue(lastRemoved == null || removedSSTable.onDiskLength() < lastRemoved.onDiskLength());
+                     assertEquals(0, removedSSTable.getSSTableLevel());
+                     assertEquals(before.size() - 1, txn.originals().size());
+                     lastRemoved = removedSSTable;
+                 }
+                 else
+                 {
+                     assertNull(removedSSTable);
+                     Pair<Set<SSTableReader>, Set<SSTableReader>> sstables = groupByLevel(txn.originals());
+                     Set<SSTableReader> l0after = sstables.left;
+                     assertEquals(1, l0after.size()); // and we stop reducing once there is a single sstable left
+                 }
+             }
+             assertFalse(removed);
+         }
+     }
+ 
+     @Test
+     public void testNoHighLevelReduction() throws IOException
+     {
+         List<SSTableReader> sstables = new ArrayList<>();
+         int i = 1;
+         for (; i < 5; i++)
+         {
+             SSTableReader sstable = MockSchema.sstable(i, (i + 1) * 1024 * 1024, cfs);
+             sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 1);
+             sstable.reloadSSTableMetadata();
+             sstables.add(sstable);
+         }
+         for (; i < 10; i++)
+         {
+             SSTableReader sstable = MockSchema.sstable(i, (i + 1) * 1024 * 1024, cfs);
+             sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 2);
+             sstable.reloadSSTableMetadata();
+             sstables.add(sstable);
+         }
+         try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.COMPACTION, sstables))
+         {
+             CompactionTask task = new LeveledCompactionTask(cfs, txn, 0, 0, 1024 * 1024, false);
+             assertFalse(task.reduceScopeForLimitedSpace(0));
+             assertEquals(new HashSet<>(sstables), txn.originals());
+         }
+     }
+ 
+     private Pair<Set<SSTableReader>, Set<SSTableReader>> groupByLevel(Iterable<SSTableReader> sstables)
+     {
+         Set<SSTableReader> l1after = new HashSet<>();
+         Set<SSTableReader> l0after = new HashSet<>();
 -        for (SSTableReader kept : sstables)
++        for (SSTableReader sstable : sstables)
+         {
 -            switch (kept.getSSTableLevel())
++            switch (sstable.getSSTableLevel())
+             {
+                 case 0:
 -                    l0after.add(kept);
++                    l0after.add(sstable);
+                     break;
+                 case 1:
 -                    l1after.add(kept);
++                    l1after.add(sstable);
+                     break;
+                 default:
+                     throw new RuntimeException("only l0 & l1 sstables");
+             }
+         }
+         return Pair.create(l0after, l1after);
+     }
++
  }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org