You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2022/12/23 13:54:06 UTC

[cassandra] branch cassandra-3.11 updated (baa9d0327f -> 0767c83416)

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a change to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git


    from baa9d0327f Merge branch 'cassandra-3.0' into cassandra-3.11
     new 730b898b74 Don't group TWCS sstables for anticompaction
     new 0767c83416 Merge branch 'cassandra-3.0' into cassandra-3.11

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |  1 +
 .../compaction/TimeWindowCompactionStrategy.java   | 14 ++++++++++++
 test/unit/org/apache/cassandra/MockSchema.java     | 19 ++++++++++++----
 .../TimeWindowCompactionStrategyTest.java          | 25 +++++++++++++++++++++-
 4 files changed, 54 insertions(+), 5 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 0767c834162d74bdc857f47615b9e8a5c7e76d5b
Merge: baa9d0327f 730b898b74
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Fri Dec 23 14:41:50 2022 +0100

    Merge branch 'cassandra-3.0' into cassandra-3.11

 CHANGES.txt                                        |  1 +
 .../compaction/TimeWindowCompactionStrategy.java   | 14 ++++++++++++
 test/unit/org/apache/cassandra/MockSchema.java     | 19 ++++++++++++----
 .../TimeWindowCompactionStrategyTest.java          | 25 +++++++++++++++++++++-
 4 files changed, 54 insertions(+), 5 deletions(-)

diff --cc CHANGES.txt
index ad25802839,3900ab5b58..ee21b34ebc
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,7 -1,5 +1,8 @@@
 -3.0.29
 +3.11.15
 + * Fix Splitter sometimes creating more splits than requested (CASSANDRA-18013)
 +
 +Merged from 3.0:
+  * Avoid anticompaction mixing data from two different time windows with TWCS (CASSANDRA-17970)
   * Do not spam the logs with MigrationCoordinator not being able to pull schemas (CASSANDRA-18096)
   * Fix incorrect resource name in LIST PERMISSION output (CASSANDRA-17848)
   * Suppress CVE-2022-41854 and similar (CASSANDRA-18083)
diff --cc src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
index 74e5f9d95a,5ae1cc784c..bbc9cdf8ee
--- a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
@@@ -376,9 -333,23 +376,23 @@@ public class TimeWindowCompactionStrate
          LifecycleTransaction txn = cfs.getTracker().tryModify(filteredSSTables, OperationType.COMPACTION);
          if (txn == null)
              return null;
 -        return Collections.singleton(new CompactionTask(cfs, txn, gcBefore));
 +        return Collections.singleton(new TimeWindowCompactionTask(cfs, txn, gcBefore, options.ignoreOverlaps));
      }
  
+     /**
+      * TWCS should not group sstables for anticompaction - this can mix new and old data
+      */
+     @Override
+     public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
+     {
+         Collection<Collection<SSTableReader>> groups = new ArrayList<>(sstablesToGroup.size());
+         for (SSTableReader sstable : sstablesToGroup)
+         {
+             groups.add(Collections.singleton(sstable));
+         }
+         return groups;
+     }
+ 
      @Override
      @SuppressWarnings("resource") // transaction is closed by AbstractCompactionTask::execute
      public synchronized AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
diff --cc test/unit/org/apache/cassandra/MockSchema.java
index 2b480d8e3c,5f3198dad0..90c8e4c705
--- a/test/unit/org/apache/cassandra/MockSchema.java
+++ b/test/unit/org/apache/cassandra/MockSchema.java
@@@ -85,27 -86,18 +85,37 @@@ public class MockSchem
          return sstable(generation, size, false, cfs);
      }
  
 +    public static SSTableReader sstable(int generation, int size, boolean keepRef, ColumnFamilyStore cfs)
 +    {
 +        return sstable(generation, size, keepRef, generation, generation, cfs);
 +    }
 +
 +    public static SSTableReader sstableWithLevel(int generation, long firstToken, long lastToken, int level, ColumnFamilyStore cfs)
 +    {
 +        return sstable(generation, 0, false, firstToken, lastToken, level, cfs);
 +    }
 +
 +    public static SSTableReader sstableWithLevel(int generation, int size, int level, ColumnFamilyStore cfs)
 +    {
 +        return sstable(generation, size, false, generation, generation, level, cfs);
 +    }
 +
+     public static SSTableReader sstableWithTimestamp(int generation, long timestamp, ColumnFamilyStore cfs)
+     {
 -        return sstable(generation, 0, false, timestamp, cfs);
++        return sstable(generation, 0, false, 0, 1000, 0, Integer.MAX_VALUE, timestamp, cfs);
+     }
+ 
 -    public static SSTableReader sstable(int generation, int size, boolean keepRef, ColumnFamilyStore cfs)
 +    public static SSTableReader sstable(int generation, int size, boolean keepRef, long firstToken, long lastToken, ColumnFamilyStore cfs)
      {
 -        return sstable(generation, size, keepRef, System.currentTimeMillis() * 1000, cfs);
 +        return sstable(generation, size, keepRef, firstToken, lastToken, 0, cfs);
      }
  
 +    public static SSTableReader sstable(int generation, int size, boolean keepRef, long firstToken, long lastToken, int level, ColumnFamilyStore cfs)
++    {
++        return sstable(generation, size, keepRef, firstToken, lastToken, level, Integer.MAX_VALUE, System.currentTimeMillis() * 1000, cfs);
++    }
+ 
 -    public static SSTableReader sstable(int generation, int size, boolean keepRef, long timestamp, ColumnFamilyStore cfs)
++    public static SSTableReader sstable(int generation, int size, boolean keepRef, long firstToken, long lastToken, int level, int minLocalDeletionTime, long timestamp, ColumnFamilyStore cfs)
      {
          Descriptor descriptor = new Descriptor(cfs.getDirectories().getDirectoryForNewSSTables(),
                                                 cfs.keyspace.getName(),
@@@ -123,40 -115,37 +133,41 @@@
              {
              }
          }
 -        SegmentedFile segmentedFile = new BufferedSegmentedFile(new ChannelProxy(tempFile), RandomAccessReader.DEFAULT_BUFFER_SIZE, size);
 -        if (size > 0)
 +        // .complete() with size to make sstable.onDiskLength work
 +        try (FileHandle.Builder builder = new FileHandle.Builder(new ChannelProxy(tempFile)).bufferSize(size);
 +             FileHandle fileHandle = builder.complete(size))
          {
 -            try
 +            if (size > 0)
              {
 -                File file = new File(descriptor.filenameFor(Component.DATA));
 -                try (RandomAccessFile raf = new RandomAccessFile(file, "rw"))
 +                try
                  {
 -                    raf.setLength(size);
 +                    File file = new File(descriptor.filenameFor(Component.DATA));
 +                    try (RandomAccessFile raf = new RandomAccessFile(file, "rw"))
 +                    {
 +                        raf.setLength(size);
 +                    }
 +                }
 +                catch (IOException e)
 +                {
 +                    throw new RuntimeException(e);
                  }
              }
 -            catch (IOException e)
 -            {
 -                throw new RuntimeException(e);
 -            }
 +            SerializationHeader header = SerializationHeader.make(cfs.metadata, Collections.emptyList());
-             StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata.comparator)
-                     .sstableLevel(level)
-                     .finalizeMetadata(cfs.metadata.partitioner.getClass().getCanonicalName(), 0.01f, UNREPAIRED_SSTABLE, header)
-                     .get(MetadataType.STATS);
++            MetadataCollector collector = new MetadataCollector(cfs.metadata.comparator);
++            collector.update(new DeletionTime(timestamp, minLocalDeletionTime));
++            StatsMetadata metadata = (StatsMetadata) collector.sstableLevel(level)
++                                                              .finalizeMetadata(cfs.metadata.partitioner.getClass().getCanonicalName(), 0.01f, UNREPAIRED_SSTABLE, header)
++                                                              .get(MetadataType.STATS);
 +            SSTableReader reader = SSTableReader.internalOpen(descriptor, components, cfs.metadata,
 +                    fileHandle.sharedCopy(), fileHandle.sharedCopy(), indexSummary.sharedCopy(),
 +                    new AlwaysPresentFilter(), 1L, metadata, SSTableReader.OpenReason.NORMAL, header);
 +            reader.first = readerBounds(firstToken);
 +            reader.last = readerBounds(lastToken);
 +            if (!keepRef)
 +                reader.selfRef().release();
 +            return reader;
          }
 -        SerializationHeader header = SerializationHeader.make(cfs.metadata, Collections.emptyList());
 -        MetadataCollector collector = new MetadataCollector(cfs.metadata.comparator);
 -        collector.update(new DeletionTime(timestamp, (int) (System.currentTimeMillis() / 1000)));
 -        StatsMetadata metadata = (StatsMetadata) collector.finalizeMetadata(cfs.metadata.partitioner.getClass().getCanonicalName(),
 -                                                                            0.01f,
 -                                                                            -1,
 -                                                                            header).get(MetadataType.STATS);
 -
 -        SSTableReader reader = SSTableReader.internalOpen(descriptor, components, cfs.metadata,
 -                                                          segmentedFile.sharedCopy(), segmentedFile.sharedCopy(), indexSummary.sharedCopy(),
 -                                                          new AlwaysPresentFilter(), 1L, metadata, SSTableReader.OpenReason.NORMAL, header);
 -        reader.first = reader.last = readerBounds(generation);
 -        if (!keepRef)
 -            reader.selfRef().release();
 -        return reader;
 +
      }
  
      public static ColumnFamilyStore newCFS()
diff --cc test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
index ee7952bde4,9bed7c1c94..15d2a2e705
--- a/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
@@@ -25,8 -26,10 +26,9 @@@ import java.util.Map
  import java.util.concurrent.TimeUnit;
  
  import com.google.common.collect.HashMultimap;
+ import com.google.common.collect.ImmutableMap;
  import com.google.common.collect.Iterables;
  
 -
  import org.junit.BeforeClass;
  import org.junit.Test;
  import static org.junit.Assert.assertEquals;
@@@ -46,7 -48,8 +47,8 @@@ import org.apache.cassandra.db.RowUpdat
  import org.apache.cassandra.exceptions.ConfigurationException;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.schema.KeyspaceParams;
 -import org.apache.cassandra.utils.Pair;
+ import org.apache.cassandra.MockSchema;
 +import org.apache.cassandra.utils.Pair;
  
  import static org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy.getWindowBoundsInMillis;
  import static org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy.newestBucket;
@@@ -307,67 -282,24 +309,88 @@@ public class TimeWindowCompactionStrate
          t.transaction.abort();
      }
  
 +    @Test
 +    public void testDropOverlappingExpiredSSTables() throws InterruptedException
 +    {
 +        Keyspace keyspace = Keyspace.open(KEYSPACE1);
 +        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
 +        cfs.truncateBlocking();
 +        cfs.disableAutoCompaction();
 +
 +        long timestamp = System.currentTimeMillis();
 +        ByteBuffer value = ByteBuffer.wrap(new byte[100]);
 +
 +        // Create a expiring sstable with a TTL
 +        DecoratedKey key = Util.dk("expired");
 +        new RowUpdateBuilder(cfs.metadata, timestamp, TTL_SECONDS, key.getKey())
 +            .clustering("column")
 +            .add("val", value).build().applyUnsafe();
 +
 +        cfs.forceBlockingFlush();
 +        SSTableReader expiredSSTable = cfs.getLiveSSTables().iterator().next();
 +        Thread.sleep(10);
 +
 +        // Create a second sstable without TTL and with a row superceded by the expiring row
 +        new RowUpdateBuilder(cfs.metadata, timestamp - 1000, key.getKey())
 +            .clustering("column")
 +            .add("val", value).build().applyUnsafe();
 +        key = Util.dk("nonexpired");
 +        new RowUpdateBuilder(cfs.metadata, timestamp, key.getKey())
 +            .clustering("column")
 +            .add("val", value).build().applyUnsafe();
 +
 +        cfs.forceBlockingFlush();
 +        assertEquals(cfs.getLiveSSTables().size(), 2);
 +
 +        Map<String, String> options = new HashMap<>();
 +        options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "30");
 +        options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, "SECONDS");
 +        options.put(TimeWindowCompactionStrategyOptions.TIMESTAMP_RESOLUTION_KEY, "MILLISECONDS");
 +        options.put(TimeWindowCompactionStrategyOptions.EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY, "0");
 +        TimeWindowCompactionStrategy twcs = new TimeWindowCompactionStrategy(cfs, options);
 +        for (SSTableReader sstable : cfs.getLiveSSTables())
 +            twcs.addSSTable(sstable);
 +
 +        twcs.startup();
 +        assertNull(twcs.getNextBackgroundTask(nowInSeconds()));
 +
 +        // Wait for the expiration of the first sstable
 +        Thread.sleep(TimeUnit.SECONDS.toMillis(TTL_SECONDS + 1));
 +        assertNull(twcs.getNextBackgroundTask(nowInSeconds()));
 +
 +        options.put(TimeWindowCompactionStrategyOptions.UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_KEY, "true");
 +        twcs = new TimeWindowCompactionStrategy(cfs, options);
 +        for (SSTableReader sstable : cfs.getLiveSSTables())
 +            twcs.addSSTable(sstable);
 +
 +        twcs.startup();
 +        AbstractCompactionTask t = twcs.getNextBackgroundTask(nowInSeconds());
 +        assertNotNull(t);
 +        assertEquals(1, Iterables.size(t.transaction.originals()));
 +        SSTableReader sstable = t.transaction.originals().iterator().next();
 +        assertEquals(sstable, expiredSSTable);
 +        twcs.shutdown();
 +        t.transaction.abort();
 +    }
++
+     @Test
+     public void testGroupForAntiCompaction()
+     {
+         ColumnFamilyStore cfs = MockSchema.newCFS("test_group_for_anticompaction");
+         cfs.setCompactionParameters(ImmutableMap.of("class", "TimeWindowCompactionStrategy",
+                                                     "timestamp_resolution", "MILLISECONDS",
+                                                     "compaction_window_size", "1",
+                                                     "compaction_window_unit", "MINUTES"));
+ 
+         List<SSTableReader> sstables = new ArrayList<>(10);
+         long curr = System.currentTimeMillis();
+         for (int i = 0; i < 10; i++)
+             sstables.add(MockSchema.sstableWithTimestamp(i, curr + TimeUnit.MILLISECONDS.convert(i, TimeUnit.MINUTES), cfs));
+ 
+         cfs.addSSTables(sstables);
 -        Collection<Collection<SSTableReader>> groups = cfs.getCompactionStrategyManager().getStrategies().get(1).groupSSTablesForAntiCompaction(sstables);
++        Collection<Collection<SSTableReader>> groups = cfs.getCompactionStrategyManager().getCompactionStrategyFor(sstables.get(0)).groupSSTablesForAntiCompaction(sstables);
+         assertTrue(groups.size() > 0);
+         for (Collection<SSTableReader> group : groups)
+             assertEquals(1, group.size());
+     }
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org