You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jw...@apache.org on 2022/11/07 22:20:33 UTC
[cassandra] branch trunk updated: CASSANDRA-17711: Add nodetool forcecompact
This is an automated email from the ASF dual-hosted git repository.
jwest pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 873e024a32 CASSANDRA-17711: Add nodetool forcecompact
873e024a32 is described below
commit 873e024a32d37de08550c8106a8d7fd52bda588b
Author: Cheng Wang <ch...@netflix.com>
AuthorDate: Fri Jun 17 15:53:34 2022 -0700
CASSANDRA-17711: Add nodetool forcecompact
---
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ColumnFamilyStore.java | 29 +++
.../db/compaction/CompactionIterator.java | 12 +
.../cassandra/db/compaction/CompactionManager.java | 29 +++
.../cassandra/db/partitions/PurgeFunction.java | 13 +-
.../apache/cassandra/service/StorageService.java | 18 ++
.../cassandra/service/StorageServiceMBean.java | 7 +
src/java/org/apache/cassandra/tools/NodeProbe.java | 5 +
src/java/org/apache/cassandra/tools/NodeTool.java | 8 +-
.../cassandra/tools/nodetool/ForceCompact.java | 58 +++++
.../tools/nodetool/ForceCompactionTest.java | 285 +++++++++++++++++++++
11 files changed, 463 insertions(+), 2 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 74055ab158..3fabee8b0f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.2
+ * Add nodetool forcecompact to remove tombstoned or ttl'd data ignoring GC grace for given table and partition keys (CASSANDRA-17711)
* Offer IF (NOT) EXISTS in cqlsh completion for CREATE TYPE, DROP TYPE, CREATE ROLE and DROP ROLE (CASSANDRA-16640)
* Nodetool bootstrap resume will now return an error if the operation fails (CASSANDRA-16491)
* Disable resumable bootstrap by default (CASSANDRA-17679)
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index ebe4aeba8a..9fc0f775e1 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -38,6 +38,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -320,6 +321,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
private volatile boolean compactionSpaceCheck = true;
+ // Tombtone partitions that ignore the gc_grace_seconds during compaction
+ private final Set<DecoratedKey> partitionKeySetIgnoreGcGrace = ConcurrentHashMap.newKeySet();
+
@VisibleForTesting
final DiskBoundaryManager diskBoundaryManager = new DiskBoundaryManager();
private volatile ShardBoundaries cachedShardBoundaries = null;
@@ -2416,6 +2420,31 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
CompactionManager.instance.forceCompactionForKey(this, key);
}
+ public void forceCompactionKeysIgnoringGcGrace(String... partitionKeysIgnoreGcGrace)
+ {
+ List<DecoratedKey> decoratedKeys = new ArrayList<>();
+ try
+ {
+ partitionKeySetIgnoreGcGrace.clear();
+
+ for (String key : partitionKeysIgnoreGcGrace) {
+ DecoratedKey dk = decorateKey(metadata().partitionKeyType.fromString(key));
+ partitionKeySetIgnoreGcGrace.add(dk);
+ decoratedKeys.add(dk);
+ }
+
+ CompactionManager.instance.forceCompactionForKeys(this, decoratedKeys);
+ } finally
+ {
+ partitionKeySetIgnoreGcGrace.clear();
+ }
+ }
+
+ public boolean shouldIgnoreGcGraceForKey(DecoratedKey dk)
+ {
+ return partitionKeySetIgnoreGcGrace.contains(dk);
+ }
+
public static Iterable<ColumnFamilyStore> all()
{
List<Iterable<ColumnFamilyStore>> stores = new ArrayList<>(Schema.instance.getKeyspaces().size());
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index 2f79f92b84..bceb2b6a97 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -349,6 +349,18 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
updateBytesRead();
}
+ /*
+ * Called at the beginning of each new partition
+ * Return true if the current partitionKey ignores the gc_grace_seconds during compaction.
+ * Note that this method should be called after the onNewPartition because it depends on the currentKey
+ * which is set in the onNewPartition
+ */
+ @Override
+ protected boolean shouldIgnoreGcGrace()
+ {
+ return controller.cfs.shouldIgnoreGcGraceForKey(currentKey);
+ }
+
/*
* Evaluates whether a tombstone with the given deletion timestamp can be purged. This is the minimum
* timestamp for any sstable containing `currentKey` outside of the set of sstables involved in this compaction.
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index dc22b6712a..2e1d94e807 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1047,6 +1047,23 @@ public class CompactionManager implements CompactionManagerMBean
forceCompaction(cfStore, () -> sstablesWithKey(cfStore, key), sstable -> sstable.maybePresent(key));
}
+ public void forceCompactionForKeys(ColumnFamilyStore cfStore, Collection<DecoratedKey> keys)
+ {
+ com.google.common.base.Predicate<SSTableReader> predicate = sstable -> {
+ for (DecoratedKey key : keys)
+ {
+ if(sstable.maybePresent(key))
+ {
+ return true;
+ }
+ }
+
+ return false;
+ };
+
+ forceCompaction(cfStore, () -> sstablesWithKeys(cfStore, keys), predicate);
+ }
+
private static Collection<SSTableReader> sstablesWithKey(ColumnFamilyStore cfs, DecoratedKey key)
{
final Set<SSTableReader> sstables = new HashSet<>();
@@ -1060,6 +1077,18 @@ public class CompactionManager implements CompactionManagerMBean
return sstables.isEmpty() ? Collections.emptyList() : sstables;
}
+ private static Collection<SSTableReader> sstablesWithKeys(ColumnFamilyStore cfs, Collection<DecoratedKey> decoratedKeys)
+ {
+ final Set<SSTableReader> sstables = new HashSet<>();
+
+ for (DecoratedKey decoratedKey : decoratedKeys)
+ {
+ sstables.addAll(sstablesWithKey(cfs, decoratedKey));
+ }
+
+ return sstables;
+ }
+
public void forceUserDefinedCompaction(String dataFiles)
{
String[] filenames = dataFiles.split(",");
diff --git a/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java b/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
index 09f3ae3bbf..5d97fd36b1 100644
--- a/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
+++ b/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
@@ -31,13 +31,15 @@ public abstract class PurgeFunction extends Transformation<UnfilteredRowIterator
private final boolean enforceStrictLiveness;
private boolean isReverseOrder;
+ private boolean ignoreGcGraceSeconds;
+
public PurgeFunction(int nowInSec, int gcBefore, int oldestUnrepairedTombstone, boolean onlyPurgeRepairedTombstones,
boolean enforceStrictLiveness)
{
this.nowInSec = nowInSec;
this.purger = (timestamp, localDeletionTime) ->
!(onlyPurgeRepairedTombstones && localDeletionTime >= oldestUnrepairedTombstone)
- && localDeletionTime < gcBefore
+ && (localDeletionTime < gcBefore || ignoreGcGraceSeconds)
&& getPurgeEvaluator().test(timestamp);
this.enforceStrictLiveness = enforceStrictLiveness;
}
@@ -59,6 +61,13 @@ public abstract class PurgeFunction extends Transformation<UnfilteredRowIterator
{
}
+ // Called at the beginning of each new partition
+ // Return true if the current partitionKey ignores the gc_grace_seconds during compaction.
+ protected boolean shouldIgnoreGcGrace()
+ {
+ return false;
+ }
+
protected void setReverseOrder(boolean isReverseOrder)
{
this.isReverseOrder = isReverseOrder;
@@ -70,6 +79,8 @@ public abstract class PurgeFunction extends Transformation<UnfilteredRowIterator
{
onNewPartition(partition.partitionKey());
+ ignoreGcGraceSeconds = shouldIgnoreGcGrace();
+
setReverseOrder(partition.isReverseOrder());
UnfilteredRowIterator purged = Transformation.apply(partition, this);
if (purged.isEmpty())
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index d616233104..da130c60a0 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -4072,6 +4072,24 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
}
+ /***
+ * Forces compaction for a list of partition keys in a table
+ * The method will ignore the gc_grace_seconds for the partitionKeysIgnoreGcGrace during the comapction,
+ * in order to purge the tombstones and free up space quicker.
+ * @param keyspaceName keyspace name
+ * @param tableName table name
+ * @param partitionKeysIgnoreGcGrace partition keys ignoring the gc_grace_seconds
+ * @throws IOException on any I/O operation error
+ * @throws ExecutionException when attempting to retrieve the result of a task that aborted by throwing an exception
+ * @throws InterruptedException when a thread is waiting, sleeping, or otherwise occupied, and the thread is interrupted, either before or during the activity
+ */
+ public void forceCompactionKeysIgnoringGcGrace(String keyspaceName,
+ String tableName, String... partitionKeysIgnoreGcGrace) throws IOException, ExecutionException, InterruptedException
+ {
+ ColumnFamilyStore cfStore = getValidKeyspace(keyspaceName).getColumnFamilyStore(tableName);
+ cfStore.forceCompactionKeysIgnoringGcGrace(partitionKeysIgnoreGcGrace);
+ }
+
/**
* Takes the snapshot for the given keyspaces. A snapshot name must be specified.
*
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 101c9a3ebf..c92ea72bd6 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -364,6 +364,13 @@ public interface StorageServiceMBean extends NotificationEmitter
*/
public void forceKeyspaceCompactionForPartitionKey(String keyspaceName, String partitionKey, String... tableNames) throws IOException, ExecutionException, InterruptedException;
+ /**
+ * Forces compaction for a list of partition keys on a table.
+ * The method will ignore the gc_grace_seconds for the partitionKeysIgnoreGcGrace during the comapction,
+ * in order to purge the tombstones and free up space quicker.
+ */
+ public void forceCompactionKeysIgnoringGcGrace(String keyspaceName, String tableName, String... partitionKeysIgnoreGcGrace) throws IOException, ExecutionException, InterruptedException;
+
/**
* Trigger a cleanup of keys on a single keyspace
*/
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 7abede2ca8..e296d39723 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -460,6 +460,11 @@ public class NodeProbe implements AutoCloseable
ssProxy.forceKeyspaceCompactionForPartitionKey(keyspaceName, partitionKey, tableNames);
}
+ public void forceCompactionKeysIgnoringGcGrace(String keyspaceName, String tableName, String... partitionKeysIgnoreGcGrace) throws IOException, ExecutionException, InterruptedException
+ {
+ ssProxy.forceCompactionKeysIgnoringGcGrace(keyspaceName, tableName, partitionKeysIgnoreGcGrace);
+ }
+
public void forceKeyspaceFlush(String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException
{
ssProxy.forceKeyspaceFlush(keyspaceName, tableNames);
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index 8d87c88906..5dca8eda73 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -225,7 +225,8 @@ public class NodeTool
UpgradeSSTable.class,
Verify.class,
Version.class,
- ViewBuildStatus.class
+ ViewBuildStatus.class,
+ ForceCompact.class
);
Cli.CliBuilder<NodeToolCmdRunnable> builder = Cli.builder("nodetool");
@@ -484,6 +485,11 @@ public class NodeTool
{
return cmdArgs.size() <= 1 ? EMPTY_STRING_ARRAY : toArray(cmdArgs.subList(1, cmdArgs.size()), String.class);
}
+
+ protected String[] parsePartitionKeys(List<String> cmdArgs)
+ {
+ return cmdArgs.size() <= 2 ? EMPTY_STRING_ARRAY : toArray(cmdArgs.subList(2, cmdArgs.size()), String.class);
+ }
}
public static SortedMap<String, SetHostStatWithPort> getOwnershipByDcWithPort(NodeProbe probe, boolean resolveIp,
diff --git a/src/java/org/apache/cassandra/tools/nodetool/ForceCompact.java b/src/java/org/apache/cassandra/tools/nodetool/ForceCompact.java
new file mode 100644
index 0000000000..99265e7bf0
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/ForceCompact.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.nodetool;
+
+import io.airlift.airline.Arguments;
+import io.airlift.airline.Command;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+@Command(name = "forcecompact", description = "Force a (major) compaction on a table")
+public class ForceCompact extends NodeToolCmd
+{
+ @Arguments(usage = "[<keyspace> <table> <keys>]", description = "The keyspace, table, and a list of partition keys ignoring the gc_grace_seconds")
+ private List<String> args = new ArrayList<>();
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ // Check if the input has valid size
+ checkArgument(args.size() >= 3, "forcecompact requires keyspace, table and keys args");
+
+ // We rely on lower-level APIs to check and throw exceptions if the input keyspace or table name are invalid
+ String keyspaceName = args.get(0);
+ String tableName = args.get(1);
+ String[] partitionKeysIgnoreGcGrace = parsePartitionKeys(args);
+
+ try
+ {
+ probe.forceCompactionKeysIgnoringGcGrace(keyspaceName, tableName, partitionKeysIgnoreGcGrace);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Error occurred during compaction keys", e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/tools/nodetool/ForceCompactionTest.java b/test/unit/org/apache/cassandra/tools/nodetool/ForceCompactionTest.java
new file mode 100644
index 0000000000..04d369ec6d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/tools/nodetool/ForceCompactionTest.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.nodetool;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+
+public class ForceCompactionTest extends CQLTester
+{
+ private final static int NUM_PARTITIONS = 10;
+ private final static int NUM_ROWS = 100;
+
+ @Before
+ public void setup() throws Throwable
+ {
+ createTable("CREATE TABLE %s (key text, c1 text, c2 text, c3 text, PRIMARY KEY (key, c1))");
+
+ for (int partitionCount = 0; partitionCount < NUM_PARTITIONS; partitionCount++)
+ {
+ for (int rowCount = 0; rowCount < NUM_ROWS; rowCount++)
+ {
+ execute("INSERT INTO %s (key, c1, c2, c3) VALUES (?, ?, ?, ?)",
+ "k" + partitionCount, "c1_" + rowCount, "c2_" + rowCount, "c3_" + rowCount);
+ }
+ }
+
+ // Disable auto compaction
+ // NOTE: We can only disable the auto compaction once the table is created because the setting is on
+ // the table level. And we don't need to re-enable it back because the table will be dropped after the test.
+ disableCompaction();
+ }
+
+ @Test
+ public void forceCompactPartitionTombstoneTest() throws Throwable
+ {
+ String keyToPurge = "k0";
+
+ testHelper("DELETE FROM %s WHERE key = ?", keyToPurge);
+ }
+
+ @Test
+ public void forceCompactMultiplePartitionsTombstoneTest() throws Throwable
+ {
+ List<String> keysToPurge = new ArrayList<>();
+ Random rand = new Random();
+
+ int numPartitionsToPurge = 1 + rand.nextInt(NUM_PARTITIONS);
+ for (int count = 0; count < numPartitionsToPurge; count++)
+ {
+ String key = "k" + rand.nextInt(NUM_PARTITIONS);
+
+ execute("DELETE FROM %s WHERE key = ?", key);
+ keysToPurge.add(key);
+ }
+
+ flush();
+
+ String[] keys = new String[keysToPurge.size()];
+ keys = keysToPurge.toArray(keys);
+ forceCompact(keys);
+
+ verifyNotContainsTombstones();
+ }
+
+ @Test
+ public void forceCompactRowTombstoneTest() throws Throwable
+ {
+ String keyToPurge = "k0";
+
+ testHelper("DELETE FROM %s WHERE key = ? AND c1 = 'c1_0'", keyToPurge);
+ }
+
+ @Test
+ public void forceCompactMultipleRowsTombstoneTest() throws Throwable
+ {
+ List<String> keysToPurge = new ArrayList<>();
+
+ Random randPartition = new Random();
+ Random randRow = new Random();
+
+ for (int count = 0; count < 10; count++)
+ {
+ String partitionKey = "k" + randPartition.nextInt(NUM_PARTITIONS);
+ String clusteringKey = "c1_" + randRow.nextInt(NUM_ROWS);
+
+ execute("DELETE FROM %s WHERE key = ? AND c1 = ?", partitionKey, clusteringKey);
+ keysToPurge.add(partitionKey);
+ }
+
+ flush();
+
+ String[] keys = new String[keysToPurge.size()];
+ keys = keysToPurge.toArray(keys);
+ forceCompact(keys);
+
+ verifyNotContainsTombstones();
+ }
+
+ @Test
+ public void forceCompactCellTombstoneTest() throws Throwable
+ {
+ String keyToPurge = "k0";
+ testHelper("DELETE c2 FROM %s WHERE key = ? AND c1 = 'c1_0'", keyToPurge);
+ }
+
+ @Test
+ public void forceCompactMultipleCellsTombstoneTest() throws Throwable
+ {
+ List<String> keysToPurge = new ArrayList<>();
+
+ Random randPartition = new Random();
+ Random randRow = new Random();
+
+ for (int count = 0; count < 10; count++)
+ {
+ String partitionKey = "k" + randPartition.nextInt(NUM_PARTITIONS);
+ String clusteringKey = "c1_" + randRow.nextInt(NUM_ROWS);
+
+ execute("DELETE c2, c3 FROM %s WHERE key = ? AND c1 = ?", partitionKey, clusteringKey);
+ keysToPurge.add(partitionKey);
+ }
+
+ flush();
+
+ String[] keys = new String[keysToPurge.size()];
+ keys = keysToPurge.toArray(keys);
+ forceCompact(keys);
+
+ verifyNotContainsTombstones();
+ }
+
+ @Test
+ public void forceCompactUpdateCellTombstoneTest() throws Throwable
+ {
+ String keyToPurge = "k0";
+ testHelper("UPDATE %s SET c2 = null WHERE key = ? AND c1 = 'c1_0'", keyToPurge);
+ }
+
+ @Test
+ public void forceCompactTTLExpiryTest() throws Throwable
+ {
+ int ttlSec = 2;
+ String keyToPurge = "k0";
+
+ execute("UPDATE %s USING TTL ? SET c2 = 'bbb' WHERE key = ? AND c1 = 'c1_0'", ttlSec, keyToPurge);
+
+ flush();
+
+ // Wait until the TTL has been expired
+ // NOTE: we double the wait time of the ttl to be on the safer side and avoid the flakiness of the test
+ Thread.sleep(ttlSec * 1000 * 2);
+
+ String[] keysToPurge = new String[]{keyToPurge};
+ forceCompact(keysToPurge);
+
+ verifyNotContainsTombstones();
+ }
+
+ @Test
+ public void forceCompactCompositePartitionKeysTest() throws Throwable
+ {
+ createTable("CREATE TABLE %s (key1 text, key2 text, c1 text, c2 text, PRIMARY KEY ((key1, key2), c1))");
+
+ for (int partitionCount = 0; partitionCount < NUM_PARTITIONS; partitionCount++)
+ {
+ for (int rowCount = 0; rowCount < NUM_ROWS; rowCount++)
+ {
+ execute("INSERT INTO %s (key1, key2, c1, c2) VALUES (?, ?, ?, ?)",
+ "k1_" + partitionCount, "k2_" + partitionCount, "c1_" + rowCount, "c2_" + rowCount);
+ }
+ }
+
+ // Disable auto compaction
+ // NOTE: We can only disable the auto compaction once the table is created because the setting is on
+ // the table level. And we don't need to re-enable it back because the table will be dropped after the test.
+ disableCompaction();
+
+ String keyToPurge = "k1_0:k2_0";
+
+ execute("DELETE FROM %s WHERE key1 = 'k1_0' and key2 = 'k2_0'");
+
+ flush();
+
+ String[] keysToPurge = new String[]{keyToPurge};
+ forceCompact(keysToPurge);
+
+ verifyNotContainsTombstones();
+ }
+
+ private void testHelper(String cqlStatement, String keyToPurge) throws Throwable
+ {
+ execute(cqlStatement, keyToPurge);
+
+ flush();
+
+ String[] keysToPurge = new String[]{keyToPurge};
+ forceCompact(keysToPurge);
+
+ verifyNotContainsTombstones();
+ }
+
+ private void forceCompact(String[] partitionKeysIgnoreGcGrace)
+ {
+ ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+ if (cfs != null)
+ {
+ cfs.forceCompactionKeysIgnoringGcGrace(partitionKeysIgnoreGcGrace);
+ }
+ }
+
+ private void verifyNotContainsTombstones()
+ {
+ // Get sstables
+ ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(currentTable());
+ Collection<SSTableReader> sstables = cfs.getLiveSSTables();
+
+ // always run a major compaction before calling this
+ assertTrue(sstables.size() == 1);
+
+ SSTableReader sstable = sstables.iterator().next();
+ int actualPurgedTombstoneCount = 0;
+ try (ISSTableScanner scanner = sstable.getScanner())
+ {
+ while (scanner.hasNext())
+ {
+ try (UnfilteredRowIterator iter = scanner.next())
+ {
+ // Partition should be all alive
+ assertTrue(iter.partitionLevelDeletion().isLive());
+
+ while (iter.hasNext())
+ {
+ Unfiltered atom = iter.next();
+ if (atom.isRow())
+ {
+ Row r = (Row)atom;
+
+ // Row should be all alive
+ assertTrue(r.deletion().isLive());
+
+ // Cell should be alive as well
+ for (Cell c : r.cells())
+ {
+ assertFalse(c.isTombstone());
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org