You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2021/09/06 11:17:05 UTC
[cassandra] branch trunk updated: Add a flag to upgradesstables to
allow only upgrading sstables older than a certain timestamp and recompress
command
This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new cfc402d Add a flag to upgradesstables to allow only upgrading sstables older than a certain timestamp and recompress command
cfc402d is described below
commit cfc402d26a628bbc9e005c370f5707f4985207aa
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Tue Aug 3 14:03:37 2021 +0200
Add a flag to upgradesstables to allow only upgrading sstables older than a certain timestamp and recompress command
Patch by Alex Petrov, reviewed by Marcus Eriksson for CASSANDRA-16837.
---
.../org/apache/cassandra/db/ColumnFamilyStore.java | 15 ++-
.../cassandra/db/compaction/CompactionManager.java | 36 +++++-
.../apache/cassandra/service/StorageService.java | 26 +++-
.../cassandra/service/StorageServiceMBean.java | 9 +-
src/java/org/apache/cassandra/tools/NodeProbe.java | 82 ++++++------
src/java/org/apache/cassandra/tools/NodeTool.java | 138 ++++++++++-----------
...UpgradeSSTable.java => RecompressSSTables.java} | 11 +-
.../cassandra/tools/nodetool/UpgradeSSTable.java | 11 +-
.../distributed/test/UpgradeSSTablesTest.java | 119 ++++++++++++++++++
9 files changed, 321 insertions(+), 126 deletions(-)
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index c9f9964..05161d7 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1568,9 +1568,20 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return CompactionManager.instance.performVerify(ColumnFamilyStore.this, options);
}
- public CompactionManager.AllSSTableOpStatus sstablesRewrite(boolean excludeCurrentVersion, int jobs) throws ExecutionException, InterruptedException
+ /**
+ * Rewrites all SSTables according to specified parameters
+ *
+ * @param skipIfCurrentVersion - if {@link true}, will rewrite only SSTables that have version older than the current one ({@link BigFormat#latestVersion})
+ * @param skipIfNewerThanTimestamp - max timestamp (local creation time) for SSTable; SSTables created _after_ this timestamp will be excluded from compaction
+ * @param skipIfCompressionMatches - if {@link true}, will rewrite only SSTables whose compression parameters are different from {@link CFMetaData#compressionParams()}
+ * @param jobs number of jobs for parallel execution
+ */
+ public CompactionManager.AllSSTableOpStatus sstablesRewrite(final boolean skipIfCurrentVersion,
+ final long skipIfNewerThanTimestamp,
+ final boolean skipIfCompressionMatches,
+ final int jobs) throws ExecutionException, InterruptedException
{
- return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, excludeCurrentVersion, jobs);
+ return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, skipIfCurrentVersion, skipIfNewerThanTimestamp, skipIfCompressionMatches, jobs);
}
public CompactionManager.AllSSTableOpStatus relocateSSTables(int jobs) throws ExecutionException, InterruptedException
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index fbfbbf3..c3e4f59 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -62,6 +62,7 @@ import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.index.SecondaryIndexBuilder;
+import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.IndexSummaryRedistribution;
@@ -479,7 +480,38 @@ public class CompactionManager implements CompactionManagerMBean
}, 0, OperationType.VERIFY);
}
- public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, final boolean excludeCurrentVersion, int jobs) throws InterruptedException, ExecutionException
+ public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs,
+ final boolean skipIfCurrentVersion,
+ final long skipIfOlderThanTimestamp,
+ final boolean skipIfCompressionMatches,
+ int jobs) throws InterruptedException, ExecutionException
+ {
+ return performSSTableRewrite(cfs, (sstable) -> {
+ // Skip if descriptor version matches current version
+ if (skipIfCurrentVersion && sstable.descriptor.version.equals(sstable.descriptor.getFormat().getLatestVersion()))
+ return false;
+
+ // Skip if SSTable creation time is past given timestamp
+ if (sstable.getCreationTimeFor(Component.DATA) > skipIfOlderThanTimestamp)
+ return false;
+
+ TableMetadata metadata = cfs.metadata.get();
+ // Skip if SSTable compression parameters match current ones
+ if (skipIfCompressionMatches &&
+ ((!sstable.compression && !metadata.params.compression.isEnabled()) ||
+ (sstable.compression && metadata.params.compression.equals(sstable.getCompressionMetadata().parameters))))
+ return false;
+
+ return true;
+ }, jobs);
+ }
+
+ /**
+ * Perform SSTable rewrite
+
+ * @param sstableFilter sstables for which predicate returns {@link false} will be excluded
+ */
+ public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, Predicate<SSTableReader> sstableFilter, int jobs) throws InterruptedException, ExecutionException
{
return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
{
@@ -492,7 +524,7 @@ public class CompactionManager implements CompactionManagerMBean
while (iter.hasNext())
{
SSTableReader sstable = iter.next();
- if (excludeCurrentVersion && sstable.descriptor.version.equals(sstable.descriptor.getFormat().getLatestVersion()))
+ if (!sstableFilter.test(sstable))
{
transaction.cancel(sstable);
iter.remove();
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 593e5fd..a976a28 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -3683,12 +3683,34 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return upgradeSSTables(keyspaceName, excludeCurrentVersion, 0, tableNames);
}
- public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException
+ public int upgradeSSTables(String keyspaceName,
+ final boolean skipIfCurrentVersion,
+ final long skipIfNewerThanTimestamp,
+ int jobs,
+ String... tableNames) throws IOException, ExecutionException, InterruptedException
+ {
+ return rewriteSSTables(keyspaceName, skipIfCurrentVersion, skipIfNewerThanTimestamp, false, jobs, tableNames);
+ }
+
+ public int recompressSSTables(String keyspaceName,
+ int jobs,
+ String... tableNames) throws IOException, ExecutionException, InterruptedException
+ {
+ return rewriteSSTables(keyspaceName, false, Long.MAX_VALUE, true, jobs, tableNames);
+ }
+
+
+ public int rewriteSSTables(String keyspaceName,
+ final boolean skipIfCurrentVersion,
+ final long skipIfNewerThanTimestamp,
+ final boolean skipIfCompressionMatches,
+ int jobs,
+ String... tableNames) throws IOException, ExecutionException, InterruptedException
{
CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, true, keyspaceName, tableNames))
{
- CompactionManager.AllSSTableOpStatus oneStatus = cfStore.sstablesRewrite(excludeCurrentVersion, jobs);
+ CompactionManager.AllSSTableOpStatus oneStatus = cfStore.sstablesRewrite(skipIfCurrentVersion, skipIfNewerThanTimestamp, skipIfCompressionMatches, jobs);
if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL)
status = oneStatus;
}
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 4a27f84..f29dd89 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -366,7 +366,14 @@ public interface StorageServiceMBean extends NotificationEmitter
*/
@Deprecated
public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... tableNames) throws IOException, ExecutionException, InterruptedException;
- public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException;
+ @Deprecated
+ default int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException
+ {
+ return upgradeSSTables(keyspaceName, excludeCurrentVersion, Long.MAX_VALUE, jobs, tableNames);
+ }
+
+ public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, long maxSSTableTimestamp, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException;
+ public int recompressSSTables(String keyspaceName, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException;
/**
* Rewrites all sstables from the given tables to remove deleted data.
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index d1b7528..2f65e58 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -330,9 +330,9 @@ public class NodeProbe implements AutoCloseable
return ssProxy.verify(extendedVerify, checkVersion, diskFailurePolicy, mutateRepairStatus, checkOwnsTokens, quick, keyspaceName, tableNames);
}
- public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException
+ public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, long maxSSTableTimestamp, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException
{
- return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, jobs, tableNames);
+ return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, maxSSTableTimestamp, jobs, tableNames);
}
public int garbageCollect(String tombstoneOption, int jobs, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException
@@ -340,6 +340,11 @@ public class NodeProbe implements AutoCloseable
return ssProxy.garbageCollect(tombstoneOption, jobs, keyspaceName, tableNames);
}
+ public int recompressSSTables(String keyspaceName, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException
+ {
+ return ssProxy.recompressSSTables(keyspaceName, jobs, tableNames);
+ }
+
private void checkJobs(PrintStream out, int jobs)
{
int compactors = ssProxy.getConcurrentCompactors();
@@ -350,64 +355,59 @@ public class NodeProbe implements AutoCloseable
public void forceKeyspaceCleanup(PrintStream out, int jobs, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException
{
checkJobs(out, jobs);
- switch (forceKeyspaceCleanup(jobs, keyspaceName, tableNames))
- {
- case 1:
- failed = true;
- out.println("Aborted cleaning up at least one table in keyspace "+keyspaceName+", check server logs for more information.");
- break;
- case 2:
- failed = true;
- out.println("Failed marking some sstables compacting in keyspace "+keyspaceName+", check server logs for more information");
- break;
- }
+ perform(out, keyspaceName,
+ () -> forceKeyspaceCleanup(jobs, keyspaceName, tableNames),
+ "cleaning up");
}
public void scrub(PrintStream out, boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTL, int jobs, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException
{
checkJobs(out, jobs);
- switch (ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, reinsertOverflowedTTL, jobs, keyspaceName, tables))
- {
- case 1:
- failed = true;
- out.println("Aborted scrubbing at least one table in keyspace "+keyspaceName+", check server logs for more information.");
- break;
- case 2:
- failed = true;
- out.println("Failed marking some sstables compacting in keyspace "+keyspaceName+", check server logs for more information");
- break;
- }
+ perform(out, keyspaceName,
+ () -> scrub(disableSnapshot, skipCorrupted, checkData, reinsertOverflowedTTL, jobs, keyspaceName, tables),
+ "scrubbing");
}
public void verify(PrintStream out, boolean extendedVerify, boolean checkVersion, boolean diskFailurePolicy, boolean mutateRepairStatus, boolean checkOwnsTokens, boolean quick, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException
{
- switch (verify(extendedVerify, checkVersion, diskFailurePolicy, mutateRepairStatus, checkOwnsTokens, quick, keyspaceName, tableNames))
- {
- case 1:
- failed = true;
- out.println("Aborted verifying at least one table in keyspace "+keyspaceName+", check server logs for more information.");
- break;
- case 2:
- failed = true;
- out.println("Failed marking some sstables compacting in keyspace "+keyspaceName+", check server logs for more information");
- break;
- }
+ perform(out, keyspaceName,
+ () -> verify(extendedVerify, checkVersion, diskFailurePolicy, mutateRepairStatus, checkOwnsTokens, quick, keyspaceName, tableNames),
+ "verifying");
}
+ public void recompressSSTables(PrintStream out, String keyspaceName, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException
+ {
+ checkJobs(out, jobs);
+ perform(out, keyspaceName,
+ () -> recompressSSTables(keyspaceName, jobs, tableNames),
+ "recompressing sstables");
+ }
- public void upgradeSSTables(PrintStream out, String keyspaceName, boolean excludeCurrentVersion, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException
+ public void upgradeSSTables(PrintStream out, String keyspaceName, boolean excludeCurrentVersion, long maxSSTableTimestamp, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException
{
checkJobs(out, jobs);
- switch (upgradeSSTables(keyspaceName, excludeCurrentVersion, jobs, tableNames))
+ perform(out, keyspaceName,
+ () -> upgradeSSTables(keyspaceName, excludeCurrentVersion, maxSSTableTimestamp, jobs, tableNames),
+ "upgrading sstables");
+ }
+
+ private static interface Job
+ {
+ int perform() throws IOException, ExecutionException, InterruptedException;
+ }
+
+ private void perform(PrintStream out, String ks, Job job, String jobName) throws IOException, ExecutionException, InterruptedException
+ {
+ switch (job.perform())
{
case 1:
- failed = true;
- out.println("Aborted upgrading sstables for at least one table in keyspace " + keyspaceName + ", check server logs for more information.");
+ out.printf("Aborted %s for at least one table in keyspace %s, check server logs for more information.\n",
+ jobName, ks);
break;
case 2:
failed = true;
- out.println("Failed marking some sstables compacting in keyspace "+keyspaceName+", check server logs for more information");
- break;
+ out.printf("Failed marking some sstables compacting in keyspace %s, check server logs for more information.\n",
+ ks);
}
}
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index 1f5295f..02340ba 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -92,53 +92,63 @@ public class NodeTool
public int execute(String... args)
{
List<Class<? extends NodeToolCmdRunnable>> commands = newArrayList(
+ Assassinate.class,
CassHelp.class,
- Info.class,
- Ring.class,
- NetStats.class,
- CfStats.class,
- TableStats.class,
CfHistograms.class,
- TableHistograms.class,
+ CfStats.class,
Cleanup.class,
ClearSnapshot.class,
ClientStats.class,
Compact.class,
- Scrub.class,
- Verify.class,
- Flush.class,
- UpgradeSSTable.class,
- GarbageCollect.class,
- DisableAutoCompaction.class,
- EnableAutoCompaction.class,
- CompactionStats.class,
CompactionHistory.class,
+ CompactionStats.class,
Decommission.class,
DescribeCluster.class,
+ DescribeRing.class,
+ DisableAuditLog.class,
+ DisableAutoCompaction.class,
+ DisableBackup.class,
DisableBinary.class,
+ DisableFullQueryLog.class,
+ DisableGossip.class,
+ DisableHandoff.class,
+ DisableHintsForDC.class,
+ DisableOldProtocolVersions.class,
+ Drain.class,
+ EnableAuditLog.class,
+ EnableAutoCompaction.class,
+ EnableBackup.class,
EnableBinary.class,
+ EnableFullQueryLog.class,
EnableGossip.class,
- DisableGossip.class,
EnableHandoff.class,
- EnableFullQueryLog.class,
- DisableFullQueryLog.class,
+ EnableHintsForDC.class,
+ EnableOldProtocolVersions.class,
+ FailureDetectorInfo.class,
+ Flush.class,
+ GarbageCollect.class,
GcStats.class,
GetAuditLog.class,
GetBatchlogReplayTrottle.class,
GetCompactionThreshold.class,
GetCompactionThroughput.class,
GetConcurrency.class,
+ GetConcurrentCompactors.class,
+ GetConcurrentViewBuilders.class,
+ GetEndpoints.class,
GetFullQueryLog.class,
- GetTimeout.class,
- GetStreamThroughput.class,
- GetTraceProbability.class,
GetInterDCStreamThroughput.class,
- GetEndpoints.class,
- GetSeeds.class,
- GetSSTables.class,
+ GetLoggingLevels.class,
GetMaxHintWindow.class,
+ GetSSTables.class,
+ GetSeeds.class,
+ GetSnapshotThrottle.class,
+ GetStreamThroughput.class,
+ GetTimeout.class,
+ GetTraceProbability.class,
GossipInfo.class,
Import.class,
+ Info.class,
InvalidateCounterCache.class,
InvalidateCredentialsCache.class,
InvalidateJmxPermissionsCache.class,
@@ -148,76 +158,66 @@ public class NodeTool
InvalidateRolesCache.class,
InvalidateRowCache.class,
Join.class,
+ ListSnapshots.class,
Move.class,
+ NetStats.class,
PauseHandoff.class,
- ResumeHandoff.class,
ProfileLoad.class,
ProxyHistograms.class,
+ RangeKeySample.class,
Rebuild.class,
+ RebuildIndex.class,
+ RecompressSSTables.class,
Refresh.class,
- RemoveNode.class,
- Assassinate.class,
+ RefreshSizeEstimates.class,
+ ReloadLocalSchema.class,
ReloadSeeds.class,
- ResetFullQueryLog.class,
+ ReloadSslCertificates.class,
+ ReloadTriggers.class,
+ RelocateSSTables.class,
+ RemoveNode.class,
Repair.class,
ReplayBatchlog.class,
- SetCacheCapacity.class,
- SetConcurrency.class,
- SetHintedHandoffThrottleInKB.class,
+ ResetFullQueryLog.class,
+ ResetLocalSchema.class,
+ ResumeHandoff.class,
+ Ring.class,
+ Scrub.class,
SetBatchlogReplayThrottle.class,
+ SetCacheCapacity.class,
+ SetCacheKeysToSave.class,
SetCompactionThreshold.class,
SetCompactionThroughput.class,
- GetConcurrentCompactors.class,
+ SetConcurrency.class,
SetConcurrentCompactors.class,
- GetConcurrentViewBuilders.class,
SetConcurrentViewBuilders.class,
- SetConcurrency.class,
- SetTimeout.class,
- SetStreamThroughput.class,
+ SetHintedHandoffThrottleInKB.class,
SetInterDCStreamThroughput.class,
- SetTraceProbability.class,
+ SetLoggingLevel.class,
SetMaxHintWindow.class,
- Snapshot.class,
- ListSnapshots.class,
- GetSnapshotThrottle.class,
SetSnapshotThrottle.class,
+ SetStreamThroughput.class,
+ SetTimeout.class,
+ SetTraceProbability.class,
+ Sjk.class,
+ Snapshot.class,
Status.class,
+ StatusAutoCompaction.class,
+ StatusBackup.class,
StatusBinary.class,
StatusGossip.class,
- StatusBackup.class,
StatusHandoff.class,
- StatusAutoCompaction.class,
Stop.class,
StopDaemon.class,
- Version.class,
- DescribeRing.class,
- RebuildIndex.class,
- RangeKeySample.class,
- EnableBackup.class,
- DisableBackup.class,
- ResetLocalSchema.class,
- ReloadLocalSchema.class,
- ReloadTriggers.class,
- SetCacheKeysToSave.class,
- DisableHandoff.class,
- Drain.class,
- TruncateHints.class,
- TpStats.class,
+ TableHistograms.class,
+ TableStats.class,
TopPartitions.class,
- SetLoggingLevel.class,
- GetLoggingLevels.class,
- Sjk.class,
- DisableHintsForDC.class,
- EnableHintsForDC.class,
- FailureDetectorInfo.class,
- RefreshSizeEstimates.class,
- RelocateSSTables.class,
- ViewBuildStatus.class,
- ReloadSslCertificates.class,
- EnableAuditLog.class,
- DisableAuditLog.class,
- EnableOldProtocolVersions.class,
- DisableOldProtocolVersions.class
+ TpStats.class,
+ TruncateHints.class,
+ UpgradeSSTable.class,
+ Verify.class,
+ Version.class,
+ ViewBuildStatus.class
);
Cli.CliBuilder<NodeToolCmdRunnable> builder = Cli.builder("nodetool");
diff --git a/src/java/org/apache/cassandra/tools/nodetool/UpgradeSSTable.java b/src/java/org/apache/cassandra/tools/nodetool/RecompressSSTables.java
similarity index 78%
copy from src/java/org/apache/cassandra/tools/nodetool/UpgradeSSTable.java
copy to src/java/org/apache/cassandra/tools/nodetool/RecompressSSTables.java
index ba1b6f5..78ed7ec 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/UpgradeSSTable.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/RecompressSSTables.java
@@ -27,15 +27,12 @@ import java.util.List;
import org.apache.cassandra.tools.NodeProbe;
import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
-@Command(name = "upgradesstables", description = "Rewrite sstables (for the requested tables) that are not on the current version (thus upgrading them to said current version)")
-public class UpgradeSSTable extends NodeToolCmd
+@Command(name = "recompress_sstables", description = "Rewrite sstables (for the requested tables) that have compression configuration different from the current")
+public class RecompressSSTables extends NodeToolCmd
{
@Arguments(usage = "[<keyspace> <tables>...]", description = "The keyspace followed by one or many tables")
private List<String> args = new ArrayList<>();
- @Option(title = "include_all", name = {"-a", "--include-all-sstables"}, description = "Use -a to include all sstables, even those already on the current version")
- private boolean includeAll = false;
-
@Option(title = "jobs",
name = {"-j", "--jobs"},
description = "Number of sstables to upgrade simultanously, set to 0 to use all available compaction threads")
@@ -51,7 +48,7 @@ public class UpgradeSSTable extends NodeToolCmd
{
try
{
- probe.upgradeSSTables(probe.output().out, keyspace, !includeAll, jobs, tableNames);
+ probe.recompressSSTables(probe.output().out, keyspace, jobs, tableNames);
}
catch (Exception e)
{
@@ -59,4 +56,4 @@ public class UpgradeSSTable extends NodeToolCmd
}
}
}
-}
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/tools/nodetool/UpgradeSSTable.java b/src/java/org/apache/cassandra/tools/nodetool/UpgradeSSTable.java
index ba1b6f5..cc94d8b 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/UpgradeSSTable.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/UpgradeSSTable.java
@@ -33,9 +33,16 @@ public class UpgradeSSTable extends NodeToolCmd
@Arguments(usage = "[<keyspace> <tables>...]", description = "The keyspace followed by one or many tables")
private List<String> args = new ArrayList<>();
- @Option(title = "include_all", name = {"-a", "--include-all-sstables"}, description = "Use -a to include all sstables, even those already on the current version")
+ @Option(title = "include_all",
+ name = {"-a", "--include-all-sstables"},
+ description = "Use -a to include all sstables, even those already on the current version")
private boolean includeAll = false;
+ @Option(title = "max_timestamp",
+ name = {"-t", "--max-timestamp"},
+ description = "Use -t to compact only SSTables that have local creation time _older_ than the given timestamp")
+ private long maxSSTableTimestamp = Long.MAX_VALUE;
+
@Option(title = "jobs",
name = {"-j", "--jobs"},
description = "Number of sstables to upgrade simultanously, set to 0 to use all available compaction threads")
@@ -51,7 +58,7 @@ public class UpgradeSSTable extends NodeToolCmd
{
try
{
- probe.upgradeSSTables(probe.output().out, keyspace, !includeAll, jobs, tableNames);
+ probe.upgradeSSTables(probe.output().out, keyspace, !includeAll, maxSSTableTimestamp, jobs, tableNames);
}
catch (Exception e)
{
diff --git a/test/distributed/org/apache/cassandra/distributed/test/UpgradeSSTablesTest.java b/test/distributed/org/apache/cassandra/distributed/test/UpgradeSSTablesTest.java
new file mode 100644
index 0000000..c599f17
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/UpgradeSSTablesTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.LogAction;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+
+public class UpgradeSSTablesTest extends TestBaseImpl
+{
+ @Test
+ public void rewriteSSTablesTest() throws Throwable
+ {
+ try (ICluster<IInvokableInstance> cluster = builder().withNodes(1).withDataDirCount(1).start())
+ {
+ for (String compressionBefore : new String[]{ "{'class' : 'LZ4Compressor', 'chunk_length_in_kb' : 32}", "{'enabled': 'false'}" })
+ {
+ for (String command : new String[]{ "upgradesstables", "recompress_sstables" })
+ {
+ cluster.schemaChange(withKeyspace("DROP KEYSPACE IF EXISTS %s"));
+ cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};"));
+
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck)) " +
+ "WITH compression = " + compressionBefore));
+ cluster.get(1).acceptsOnInstance((String ks) -> {
+ Keyspace.open(ks).getColumnFamilyStore("tbl").disableAutoCompaction();
+ }).accept(KEYSPACE);
+
+ String blob = "blob";
+ for (int i = 0; i < 6; i++)
+ blob += blob;
+
+ for (int i = 0; i < 100; i++)
+ {
+ cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (?,?,?)"),
+ ConsistencyLevel.QUORUM, i, i, blob);
+ }
+ cluster.get(1).nodetool("flush", KEYSPACE, "tbl");
+
+ Assert.assertEquals(0, cluster.get(1).nodetool("upgradesstables", "-a", KEYSPACE, "tbl"));
+ cluster.schemaChange(withKeyspace("ALTER TABLE %s.tbl WITH compression = {'class' : 'LZ4Compressor', 'chunk_length_in_kb' : 128};"));
+
+ Thread.sleep(2000); // Make sure timestamp will be different even with 1-second resolution.
+
+ long maxSoFar = cluster.get(1).appliesOnInstance((String ks) -> {
+ long maxTs = -1;
+ ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore("tbl");
+ cfs.disableAutoCompaction();
+ for (SSTableReader tbl : cfs.getLiveSSTables())
+ {
+ maxTs = Math.max(maxTs, tbl.getCreationTimeFor(Component.DATA));
+ }
+ return maxTs;
+ }).apply(KEYSPACE);
+
+ for (int i = 100; i < 200; i++)
+ {
+ cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (?,?,?)"),
+ ConsistencyLevel.QUORUM, i, i, blob);
+ }
+ cluster.get(1).nodetool("flush", KEYSPACE, "tbl");
+
+ LogAction logAction = cluster.get(1).logs();
+ logAction.mark();
+
+ long expectedCount = cluster.get(1).appliesOnInstance((String ks, Long maxTs) -> {
+ long count = 0;
+ long skipped = 0;
+ Set<SSTableReader> liveSSTables = Keyspace.open(ks).getColumnFamilyStore("tbl").getLiveSSTables();
+ assert liveSSTables.size() == 2 : String.format("Expected 2 sstables, but got " + liveSSTables.size());
+ for (SSTableReader tbl : liveSSTables)
+ {
+ if (tbl.getCreationTimeFor(Component.DATA) <= maxTs)
+ count++;
+ else
+ skipped++;
+ }
+ assert skipped > 0;
+ return count;
+ }).apply(KEYSPACE, maxSoFar);
+
+ if (command.equals("upgradesstables"))
+ Assert.assertEquals(0, cluster.get(1).nodetool("upgradesstables", "-a", "-t", Long.toString(maxSoFar), KEYSPACE, "tbl"));
+ else
+ Assert.assertEquals(0, cluster.get(1).nodetool("recompress_sstables", KEYSPACE, "tbl"));
+
+ Assert.assertFalse(logAction.grep(String.format("%d sstables to", expectedCount)).getResult().isEmpty());
+ }
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org