You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2021/09/30 12:16:28 UTC

[cassandra] branch cassandra-3.0 updated: Do not release new SSTables in offline transactions

This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new 3e6faca  Do not release new SSTables in offline transactions
3e6faca is described below

commit 3e6faca572a5ca1de5906b39b8c0a6bf4deb40e9
Author: Aleksandr Sorokoumov <al...@gmail.com>
AuthorDate: Thu Sep 30 13:01:02 2021 +0100

    Do not release new SSTables in offline transactions
    
    patch by Aleksandr Sorokoumov; reviewed by Andrés de la Peña and Branimir Lambov for CASSANDRA-16975
---
 CHANGES.txt                                        |  1 +
 .../cassandra/db/compaction/CompactionTask.java    | 20 ++---
 .../db/compaction/CompactionTaskTest.java          | 91 ++++++++++++++++++++++
 3 files changed, 100 insertions(+), 12 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index cac42fb..6ef52e4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.26:
+ * Do not release new SSTables in offline transactions (CASSANDRA-16975)
  * ArrayIndexOutOfBoundsException in FunctionResource#fromName (CASSANDRA-16977, CASSANDRA-16995)
  * CVE-2015-0886 Security vulnerability in jbcrypt is addressed (CASSANDRA-9384)
  * Avoid useless SSTable reads during single partition queries (CASSANDRA-16944)
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 3437de7..d29d5e6 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -228,18 +228,14 @@ public class CompactionTask extends AbstractCompactionTask
                 newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
 
             if (offline)
-            {
-                Refs.release(Refs.selfRefs(newSStables));
-            }
-            else
-            {
-                double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
-                Summary mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), mergedRowCounts, startsize, endsize);
-                logger.debug(String.format("Compacted (%s) %d sstables to [%s] to level=%d.  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
-                                           taskId, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, mergeSummary.totalSourceRows, totalKeysWritten, mergeSummary.partitionMerge));
-                logger.trace(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
-                logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double)(totalKeysWritten - estimatedKeys)/totalKeysWritten));
-            }
+                return;
+
+            double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
+            Summary mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), mergedRowCounts, startsize, endsize);
+            logger.debug(String.format("Compacted (%s) %d sstables to [%s] to level=%d.  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
+                                       taskId, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, mergeSummary.totalSourceRows, totalKeysWritten, mergeSummary.partitionMerge));
+            logger.trace(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
+            logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double)(totalKeysWritten - estimatedKeys)/totalKeysWritten));
         }
     }
 
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java
new file mode 100644
index 0000000..5602a08
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.KeyspaceParams;
+
+public class CompactionTaskTest
+{
+    private static ColumnFamilyStore cfs;
+
+    @BeforeClass
+    public static void setUpClass() throws Exception
+    {
+        SchemaLoader.prepareServer();
+        CFMetaData table = CFMetaData.compile("CREATE TABLE tbl (k INT PRIMARY KEY, v INT)", "ks");
+        SchemaLoader.createKeyspace("ks", KeyspaceParams.simple(1), table);
+
+        cfs = Schema.instance.getColumnFamilyStoreInstance(table.cfId);
+    }
+
+    @Before
+    public void setUp() throws Exception
+    {
+        cfs.getCompactionStrategyManager().enable();
+        cfs.truncateBlocking();
+    }
+
+    @Test
+    public void testOfflineCompaction()
+    {
+        cfs.getCompactionStrategyManager().disable();
+        QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, v) VALUES (1, 1);");
+        cfs.forceBlockingFlush();
+        QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, v) VALUES (2, 2);");
+        cfs.forceBlockingFlush();
+        QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, v) VALUES (3, 3);");
+        cfs.forceBlockingFlush();
+        QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, v) VALUES (4, 4);");
+        cfs.forceBlockingFlush();
+
+        Set<SSTableReader> sstables = cfs.getLiveSSTables();
+        Assert.assertEquals(4, sstables.size());
+
+        try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.COMPACTION, sstables))
+        {
+            Assert.assertEquals(4, txn.tracker.getView().liveSSTables().size());
+            CompactionTask task = new CompactionTask(cfs, txn, 1000);
+            task.execute(null);
+
+            // Check that new SSTable was not released
+            Assert.assertEquals(1, txn.tracker.getView().liveSSTables().size());
+            SSTableReader newSSTable = txn.tracker.getView().liveSSTables().iterator().next();
+            Assert.assertNotNull(newSSTable.tryRef());
+        }
+        finally
+        {
+            // SSTables were compacted offline; CFS didn't notice that, so we have to remove them manually
+            cfs.getTracker().removeUnsafe(sstables);
+        }
+    }
+}
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org