You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2022/02/16 10:21:44 UTC

[cassandra] branch cassandra-3.0 updated: Lazy transaction log replica creation allows incorrect replica content divergence during anticompaction

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new 85c202d  Lazy transaction log replica creation allows incorrect replica content divergence during anticompaction
85c202d is described below

commit 85c202d8a8b037791db08c531f4f3c8336c82696
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Fri Jan 21 09:02:10 2022 +0100

    Lazy transaction log replica creation allows incorrect replica content divergence during anticompaction
    
    Patch by marcuse; reviewed by Caleb Rackliffe for CASSANDRA-17273
---
 CHANGES.txt                                        |  2 +-
 .../org/apache/cassandra/db/lifecycle/LogFile.java | 23 +++++----
 .../cassandra/db/lifecycle/LogTransactionTest.java | 58 ++++++++++++++++++----
 3 files changed, 64 insertions(+), 19 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index f0fefb2..527450d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,5 @@
 3.0.27
-
+ * Lazy transaction log replica creation allows incorrect replica content divergence during anticompaction (CASSANDRA-17273)
 
 3.0.26
  * Fix conversion from megabits to bytes in streaming rate limiter (CASSANDRA-17243)
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
index ac64f13..42d81ca 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
@@ -67,6 +67,9 @@ final class LogFile implements AutoCloseable
 
     // The transaction records, this set must be ORDER PRESERVING
     private final LinkedHashSet<LogRecord> records = new LinkedHashSet<>();
+    // the transaction records we have written to disk - used to guarantee that the
+    // on-disk log files become identical when creating a new replica
+    private final LinkedHashSet<LogRecord> onDiskRecords = new LinkedHashSet<>();
 
     // The type of the transaction
     private final OperationType type;
@@ -320,17 +323,13 @@ final class LogFile implements AutoCloseable
         assert type == Type.ADD || type == Type.REMOVE;
 
         for (SSTableReader sstable : tables)
-        {
-            File folder = sstable.descriptor.directory;
-            replicas.maybeCreateReplica(folder, getFileName(folder), records);
-        }
+            maybeCreateReplica(sstable);
         return LogRecord.make(type, tables);
     }
 
     private LogRecord makeAddRecord(SSTable table)
     {
-        File folder = table.descriptor.directory;
-        replicas.maybeCreateReplica(folder, getFileName(folder), records);
+        maybeCreateReplica(table);
         return LogRecord.make(Type.ADD, table);
     }
 
@@ -342,12 +341,17 @@ final class LogFile implements AutoCloseable
     private LogRecord makeRecord(Type type, SSTable table, LogRecord record)
     {
         assert type == Type.ADD || type == Type.REMOVE;
-
-        File folder = table.descriptor.directory;
-        replicas.maybeCreateReplica(folder, getFileName(folder), records);
+        maybeCreateReplica(table);
         return record.asType(type);
     }
 
+    private void maybeCreateReplica(SSTable sstable)
+    {
+        File folder = sstable.descriptor.directory;
+        String fileName = getFileName(folder);
+        replicas.maybeCreateReplica(folder, fileName, onDiskRecords);
+    }
+
     void addRecord(LogRecord record)
     {
         if (completed())
@@ -359,6 +363,7 @@ final class LogFile implements AutoCloseable
         replicas.append(record);
         if (!records.add(record))
             throw new IllegalStateException("Failed to add record");
+        onDiskRecords.add(record);
     }
 
     void remove(SSTable table)
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
index 7ba1c39..6fb2334 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
@@ -22,7 +22,13 @@ import java.io.IOError;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.file.Files;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
@@ -33,19 +39,14 @@ import com.google.common.collect.Sets;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import static junit.framework.Assert.assertNotNull;
-import static junit.framework.Assert.assertNull;
-import static junit.framework.Assert.fail;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 import junit.framework.Assert;
 import org.apache.cassandra.MockSchema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.SerializationHeader;
-import org.apache.cassandra.db.compaction.*;
-import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.sstable.metadata.MetadataType;
@@ -59,6 +60,14 @@ import org.apache.cassandra.utils.AlwaysPresentFilter;
 import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest;
 import org.apache.cassandra.utils.concurrent.Transactional;
 
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 public class LogTransactionTest extends AbstractTransactionalTest
 {
     private static final String KEYSPACE = "TransactionLogsTest";
@@ -223,6 +232,37 @@ public class LogTransactionTest extends AbstractTransactionalTest
     }
 
     @Test
+    public void testUntrackIdenticalLogFilesOnDisk() throws Throwable
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+        File datadir1 = Files.createTempDirectory("datadir1").toFile();
+        File datadir2 = Files.createTempDirectory("datadir2").toFile();
+        SSTableReader sstable1 = sstable(datadir1, cfs, 1, 128);
+        SSTableReader sstable2 = sstable(datadir2, cfs, 1, 128);
+
+
+        for (Consumer<LogTransaction> c : Arrays.<Consumer<LogTransaction>>asList((log) -> log.trackNew(sstable2),
+                                                                                  (log) -> log.obsoleted(sstable2),
+                                                                                  (log) -> log.txnFile().addAll(LogRecord.Type.ADD, Collections.singleton(sstable2))))
+        {
+            try (LogTransaction log = new LogTransaction(OperationType.COMPACTION))
+            {
+                log.trackNew(sstable1); // creates a log file in datadir1
+                log.untrackNew(sstable1); // removes sstable1 from `records`, but still on disk & in `onDiskRecords`
+
+                c.accept(log);  // creates a log file in datadir2, based on contents in onDiskRecords
+                byte[] log1 = Files.readAllBytes(log.logFiles().get(0).toPath());
+                byte[] log2 = Files.readAllBytes(log.logFiles().get(1).toPath());
+                assertArrayEquals(log1, log2);
+            }
+        }
+        sstable1.selfRef().release();
+        sstable2.selfRef().release();
+        Thread.sleep(1);
+        LogTransaction.waitForDeletions();
+    }
+
+    @Test
     public void testCommitSameDesc() throws Throwable
     {
         ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org