You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2021/09/30 12:16:28 UTC
[cassandra] branch cassandra-3.0 updated: Do not release new
SSTables in offline transactions
This is an automated email from the ASF dual-hosted git repository.
adelapena pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
new 3e6faca Do not release new SSTables in offline transactions
3e6faca is described below
commit 3e6faca572a5ca1de5906b39b8c0a6bf4deb40e9
Author: Aleksandr Sorokoumov <al...@gmail.com>
AuthorDate: Thu Sep 30 13:01:02 2021 +0100
Do not release new SSTables in offline transactions
patch by Aleksandr Sorokoumov; reviewed by Andrés de la Peña and Branimir Lambov for CASSANDRA-16975
---
CHANGES.txt | 1 +
.../cassandra/db/compaction/CompactionTask.java | 20 ++---
.../db/compaction/CompactionTaskTest.java | 91 ++++++++++++++++++++++
3 files changed, 100 insertions(+), 12 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index cac42fb..6ef52e4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.26:
+ * Do not release new SSTables in offline transactions (CASSANDRA-16975)
* ArrayIndexOutOfBoundsException in FunctionResource#fromName (CASSANDRA-16977, CASSANDRA-16995)
* CVE-2015-0886 Security vulnerability in jbcrypt is addressed (CASSANDRA-9384)
* Avoid useless SSTable reads during single partition queries (CASSANDRA-16944)
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 3437de7..d29d5e6 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -228,18 +228,14 @@ public class CompactionTask extends AbstractCompactionTask
newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
if (offline)
- {
- Refs.release(Refs.selfRefs(newSStables));
- }
- else
- {
- double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
- Summary mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), mergedRowCounts, startsize, endsize);
- logger.debug(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
- taskId, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, mergeSummary.totalSourceRows, totalKeysWritten, mergeSummary.partitionMerge));
- logger.trace(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
- logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double)(totalKeysWritten - estimatedKeys)/totalKeysWritten));
- }
+ return;
+
+ double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
+ Summary mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), mergedRowCounts, startsize, endsize);
+ logger.debug(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
+ taskId, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, mergeSummary.totalSourceRows, totalKeysWritten, mergeSummary.partitionMerge));
+ logger.trace(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
+ logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double)(totalKeysWritten - estimatedKeys)/totalKeysWritten));
}
}
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java
new file mode 100644
index 0000000..5602a08
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.KeyspaceParams;
+
+public class CompactionTaskTest
+{
+ private static ColumnFamilyStore cfs;
+
+ @BeforeClass
+ public static void setUpClass() throws Exception
+ {
+ SchemaLoader.prepareServer();
+ CFMetaData table = CFMetaData.compile("CREATE TABLE tbl (k INT PRIMARY KEY, v INT)", "ks");
+ SchemaLoader.createKeyspace("ks", KeyspaceParams.simple(1), table);
+
+ cfs = Schema.instance.getColumnFamilyStoreInstance(table.cfId);
+ }
+
+ @Before
+ public void setUp() throws Exception
+ {
+ cfs.getCompactionStrategyManager().enable();
+ cfs.truncateBlocking();
+ }
+
+ @Test
+ public void testOfflineCompaction()
+ {
+ cfs.getCompactionStrategyManager().disable();
+ QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, v) VALUES (1, 1);");
+ cfs.forceBlockingFlush();
+ QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, v) VALUES (2, 2);");
+ cfs.forceBlockingFlush();
+ QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, v) VALUES (3, 3);");
+ cfs.forceBlockingFlush();
+ QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, v) VALUES (4, 4);");
+ cfs.forceBlockingFlush();
+
+ Set<SSTableReader> sstables = cfs.getLiveSSTables();
+ Assert.assertEquals(4, sstables.size());
+
+ try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.COMPACTION, sstables))
+ {
+ Assert.assertEquals(4, txn.tracker.getView().liveSSTables().size());
+ CompactionTask task = new CompactionTask(cfs, txn, 1000);
+ task.execute(null);
+
+ // Check that new SSTable was not released
+ Assert.assertEquals(1, txn.tracker.getView().liveSSTables().size());
+ SSTableReader newSSTable = txn.tracker.getView().liveSSTables().iterator().next();
+ Assert.assertNotNull(newSSTable.tryRef());
+ }
+ finally
+ {
+ // SSTables were compacted offline; CFS didn't notice that, so we have to remove them manually
+ cfs.getTracker().removeUnsafe(sstables);
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org