You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2021/09/06 11:17:05 UTC

[cassandra] branch trunk updated: Add a flag to upgradesstables to allow only upgrading sstables older than a certain timestamp and recompress command

This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cfc402d  Add a flag to upgradesstables to allow only upgrading sstables older than a certain timestamp and recompress command
cfc402d is described below

commit cfc402d26a628bbc9e005c370f5707f4985207aa
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Tue Aug 3 14:03:37 2021 +0200

    Add a flag to upgradesstables to allow only upgrading sstables older than a certain timestamp and recompress command
    
    Patch by Alex Petrov, reviewed by Marcus Eriksson for CASSANDRA-16837.
---
 .../org/apache/cassandra/db/ColumnFamilyStore.java |  15 ++-
 .../cassandra/db/compaction/CompactionManager.java |  36 +++++-
 .../apache/cassandra/service/StorageService.java   |  26 +++-
 .../cassandra/service/StorageServiceMBean.java     |   9 +-
 src/java/org/apache/cassandra/tools/NodeProbe.java |  82 ++++++------
 src/java/org/apache/cassandra/tools/NodeTool.java  | 138 ++++++++++-----------
 ...UpgradeSSTable.java => RecompressSSTables.java} |  11 +-
 .../cassandra/tools/nodetool/UpgradeSSTable.java   |  11 +-
 .../distributed/test/UpgradeSSTablesTest.java      | 119 ++++++++++++++++++
 9 files changed, 321 insertions(+), 126 deletions(-)

diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index c9f9964..05161d7 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1568,9 +1568,20 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return CompactionManager.instance.performVerify(ColumnFamilyStore.this, options);
     }
 
-    public CompactionManager.AllSSTableOpStatus sstablesRewrite(boolean excludeCurrentVersion, int jobs) throws ExecutionException, InterruptedException
+    /**
+     * Rewrites all SSTables according to specified parameters
+     *
+     * @param skipIfCurrentVersion - if {@link true}, will rewrite only SSTables that have version older than the current one ({@link BigFormat#latestVersion})
+     * @param skipIfNewerThanTimestamp - max timestamp (local creation time) for SSTable; SSTables created _after_ this timestamp will be excluded from compaction
+     * @param skipIfCompressionMatches - if {@link true}, will rewrite only SSTables whose compression parameters are different from {@link CFMetaData#compressionParams()}
+     * @param jobs number of jobs for parallel execution
+     */
+    public CompactionManager.AllSSTableOpStatus sstablesRewrite(final boolean skipIfCurrentVersion,
+                                                                final long skipIfNewerThanTimestamp,
+                                                                final boolean skipIfCompressionMatches,
+                                                                final int jobs) throws ExecutionException, InterruptedException
     {
-        return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, excludeCurrentVersion, jobs);
+        return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, skipIfCurrentVersion, skipIfNewerThanTimestamp, skipIfCompressionMatches, jobs);
     }
 
     public CompactionManager.AllSSTableOpStatus relocateSSTables(int jobs) throws ExecutionException, InterruptedException
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index fbfbbf3..c3e4f59 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -62,6 +62,7 @@ import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.index.SecondaryIndexBuilder;
+import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.IndexSummaryRedistribution;
@@ -479,7 +480,38 @@ public class CompactionManager implements CompactionManagerMBean
         }, 0, OperationType.VERIFY);
     }
 
-    public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, final boolean excludeCurrentVersion, int jobs) throws InterruptedException, ExecutionException
+    public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs,
+                                                    final boolean skipIfCurrentVersion,
+                                                    final long skipIfOlderThanTimestamp,
+                                                    final boolean skipIfCompressionMatches,
+                                                    int jobs) throws InterruptedException, ExecutionException
+    {
+        return performSSTableRewrite(cfs, (sstable) -> {
+            // Skip if descriptor version matches current version
+            if (skipIfCurrentVersion && sstable.descriptor.version.equals(sstable.descriptor.getFormat().getLatestVersion()))
+                return false;
+
+            // Skip if SSTable creation time is past given timestamp
+            if (sstable.getCreationTimeFor(Component.DATA) > skipIfOlderThanTimestamp)
+                return false;
+
+            TableMetadata metadata = cfs.metadata.get();
+            // Skip if SSTable compression parameters match current ones
+            if (skipIfCompressionMatches &&
+                ((!sstable.compression && !metadata.params.compression.isEnabled()) ||
+                 (sstable.compression && metadata.params.compression.equals(sstable.getCompressionMetadata().parameters))))
+                return false;
+
+            return true;
+        }, jobs);
+    }
+
+    /**
+     * Perform SSTable rewrite
+
+     * @param sstableFilter sstables for which predicate returns {@link false} will be excluded
+     */
+    public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, Predicate<SSTableReader> sstableFilter, int jobs) throws InterruptedException, ExecutionException
     {
         return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
         {
@@ -492,7 +524,7 @@ public class CompactionManager implements CompactionManagerMBean
                 while (iter.hasNext())
                 {
                     SSTableReader sstable = iter.next();
-                    if (excludeCurrentVersion && sstable.descriptor.version.equals(sstable.descriptor.getFormat().getLatestVersion()))
+                    if (!sstableFilter.test(sstable))
                     {
                         transaction.cancel(sstable);
                         iter.remove();
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 593e5fd..a976a28 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -3683,12 +3683,34 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return upgradeSSTables(keyspaceName, excludeCurrentVersion, 0, tableNames);
     }
 
-    public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException
+    public int upgradeSSTables(String keyspaceName,
+                               final boolean skipIfCurrentVersion,
+                               final long skipIfNewerThanTimestamp,
+                               int jobs,
+                               String... tableNames) throws IOException, ExecutionException, InterruptedException
+    {
+        return rewriteSSTables(keyspaceName, skipIfCurrentVersion, skipIfNewerThanTimestamp, false, jobs, tableNames);
+    }
+
+    public int recompressSSTables(String keyspaceName,
+                                  int jobs,
+                                  String... tableNames) throws IOException, ExecutionException, InterruptedException
+    {
+        return rewriteSSTables(keyspaceName, false, Long.MAX_VALUE, true, jobs, tableNames);
+    }
+
+
+    public int rewriteSSTables(String keyspaceName,
+                               final boolean skipIfCurrentVersion,
+                               final long skipIfNewerThanTimestamp,
+                               final boolean skipIfCompressionMatches,
+                               int jobs,
+                               String... tableNames) throws IOException, ExecutionException, InterruptedException
     {
         CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
         for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, true, keyspaceName, tableNames))
         {
-            CompactionManager.AllSSTableOpStatus oneStatus = cfStore.sstablesRewrite(excludeCurrentVersion, jobs);
+            CompactionManager.AllSSTableOpStatus oneStatus = cfStore.sstablesRewrite(skipIfCurrentVersion, skipIfNewerThanTimestamp, skipIfCompressionMatches, jobs);
             if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL)
                 status = oneStatus;
         }
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 4a27f84..f29dd89 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -366,7 +366,14 @@ public interface StorageServiceMBean extends NotificationEmitter
      */
     @Deprecated
     public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... tableNames) throws IOException, ExecutionException, InterruptedException;
-    public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException;
+    @Deprecated
+    default int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException
+    {
+        return upgradeSSTables(keyspaceName, excludeCurrentVersion, Long.MAX_VALUE, jobs, tableNames);
+    }
+
+    public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, long maxSSTableTimestamp, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException;
+    public int recompressSSTables(String keyspaceName, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException;
 
     /**
      * Rewrites all sstables from the given tables to remove deleted data.
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index d1b7528..2f65e58 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -330,9 +330,9 @@ public class NodeProbe implements AutoCloseable
         return ssProxy.verify(extendedVerify, checkVersion, diskFailurePolicy, mutateRepairStatus, checkOwnsTokens, quick, keyspaceName, tableNames);
     }
 
-    public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException
+    public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, long maxSSTableTimestamp, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException
     {
-        return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, jobs, tableNames);
+        return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, maxSSTableTimestamp, jobs, tableNames);
     }
 
     public int garbageCollect(String tombstoneOption, int jobs, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException
@@ -340,6 +340,11 @@ public class NodeProbe implements AutoCloseable
         return ssProxy.garbageCollect(tombstoneOption, jobs, keyspaceName, tableNames);
     }
 
+    public int recompressSSTables(String keyspaceName, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException
+    {
+        return ssProxy.recompressSSTables(keyspaceName, jobs, tableNames);
+    }
+
     private void checkJobs(PrintStream out, int jobs)
     {
         int compactors = ssProxy.getConcurrentCompactors();
@@ -350,64 +355,59 @@ public class NodeProbe implements AutoCloseable
     public void forceKeyspaceCleanup(PrintStream out, int jobs, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException
     {
         checkJobs(out, jobs);
-        switch (forceKeyspaceCleanup(jobs, keyspaceName, tableNames))
-        {
-            case 1:
-                failed = true;
-                out.println("Aborted cleaning up at least one table in keyspace "+keyspaceName+", check server logs for more information.");
-                break;
-            case 2:
-                failed = true;
-                out.println("Failed marking some sstables compacting in keyspace "+keyspaceName+", check server logs for more information");
-                break;
-        }
+        perform(out, keyspaceName,
+                () -> forceKeyspaceCleanup(jobs, keyspaceName, tableNames),
+                "cleaning up");
     }
 
     public void scrub(PrintStream out, boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTL, int jobs, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException
     {
         checkJobs(out, jobs);
-        switch (ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, reinsertOverflowedTTL, jobs, keyspaceName, tables))
-        {
-            case 1:
-                failed = true;
-                out.println("Aborted scrubbing at least one table in keyspace "+keyspaceName+", check server logs for more information.");
-                break;
-            case 2:
-                failed = true;
-                out.println("Failed marking some sstables compacting in keyspace "+keyspaceName+", check server logs for more information");
-                break;
-        }
+        perform(out, keyspaceName,
+                () -> scrub(disableSnapshot, skipCorrupted, checkData, reinsertOverflowedTTL, jobs, keyspaceName, tables),
+                "scrubbing");
     }
 
     public void verify(PrintStream out, boolean extendedVerify, boolean checkVersion, boolean diskFailurePolicy, boolean mutateRepairStatus, boolean checkOwnsTokens, boolean quick, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException
     {
-        switch (verify(extendedVerify, checkVersion, diskFailurePolicy, mutateRepairStatus, checkOwnsTokens, quick, keyspaceName, tableNames))
-        {
-            case 1:
-                failed = true;
-                out.println("Aborted verifying at least one table in keyspace "+keyspaceName+", check server logs for more information.");
-                break;
-            case 2:
-                failed = true;
-                out.println("Failed marking some sstables compacting in keyspace "+keyspaceName+", check server logs for more information");
-                break;
-        }
+        perform(out, keyspaceName,
+                () -> verify(extendedVerify, checkVersion, diskFailurePolicy, mutateRepairStatus, checkOwnsTokens, quick, keyspaceName, tableNames),
+                "verifying");
     }
 
+    public void recompressSSTables(PrintStream out, String keyspaceName, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException
+    {
+        checkJobs(out, jobs);
+        perform(out, keyspaceName,
+                () -> recompressSSTables(keyspaceName, jobs, tableNames),
+                "recompressing sstables");
+    }
 
-    public void upgradeSSTables(PrintStream out, String keyspaceName, boolean excludeCurrentVersion, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException
+    public void upgradeSSTables(PrintStream out, String keyspaceName, boolean excludeCurrentVersion, long maxSSTableTimestamp, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException
     {
         checkJobs(out, jobs);
-        switch (upgradeSSTables(keyspaceName, excludeCurrentVersion, jobs, tableNames))
+        perform(out, keyspaceName,
+                () -> upgradeSSTables(keyspaceName, excludeCurrentVersion, maxSSTableTimestamp, jobs, tableNames),
+                "upgrading sstables");
+    }
+
+    private static interface Job
+    {
+        int perform() throws IOException, ExecutionException, InterruptedException;
+    }
+
+    private void perform(PrintStream out, String ks, Job job, String jobName) throws IOException, ExecutionException, InterruptedException
+    {
+        switch (job.perform())
         {
             case 1:
-                failed = true;
-                out.println("Aborted upgrading sstables for at least one table in keyspace " + keyspaceName + ", check server logs for more information.");
+                out.printf("Aborted %s for at least one table in keyspace %s, check server logs for more information.\n",
+                           jobName, ks);
                 break;
             case 2:
                 failed = true;
-                out.println("Failed marking some sstables compacting in keyspace "+keyspaceName+", check server logs for more information");
-                break;
+                out.printf("Failed marking some sstables compacting in keyspace %s, check server logs for more information.\n",
+                           ks);
         }
     }
 
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index 1f5295f..02340ba 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -92,53 +92,63 @@ public class NodeTool
     public int execute(String... args)
     {
         List<Class<? extends NodeToolCmdRunnable>> commands = newArrayList(
+                Assassinate.class,
                 CassHelp.class,
-                Info.class,
-                Ring.class,
-                NetStats.class,
-                CfStats.class,
-                TableStats.class,
                 CfHistograms.class,
-                TableHistograms.class,
+                CfStats.class,
                 Cleanup.class,
                 ClearSnapshot.class,
                 ClientStats.class,
                 Compact.class,
-                Scrub.class,
-                Verify.class,
-                Flush.class,
-                UpgradeSSTable.class,
-                GarbageCollect.class,
-                DisableAutoCompaction.class,
-                EnableAutoCompaction.class,
-                CompactionStats.class,
                 CompactionHistory.class,
+                CompactionStats.class,
                 Decommission.class,
                 DescribeCluster.class,
+                DescribeRing.class,
+                DisableAuditLog.class,
+                DisableAutoCompaction.class,
+                DisableBackup.class,
                 DisableBinary.class,
+                DisableFullQueryLog.class,
+                DisableGossip.class,
+                DisableHandoff.class,
+                DisableHintsForDC.class,
+                DisableOldProtocolVersions.class,
+                Drain.class,
+                EnableAuditLog.class,
+                EnableAutoCompaction.class,
+                EnableBackup.class,
                 EnableBinary.class,
+                EnableFullQueryLog.class,
                 EnableGossip.class,
-                DisableGossip.class,
                 EnableHandoff.class,
-                EnableFullQueryLog.class,
-                DisableFullQueryLog.class,
+                EnableHintsForDC.class,
+                EnableOldProtocolVersions.class,
+                FailureDetectorInfo.class,
+                Flush.class,
+                GarbageCollect.class,
                 GcStats.class,
                 GetAuditLog.class,
                 GetBatchlogReplayTrottle.class,
                 GetCompactionThreshold.class,
                 GetCompactionThroughput.class,
                 GetConcurrency.class,
+                GetConcurrentCompactors.class,
+                GetConcurrentViewBuilders.class,
+                GetEndpoints.class,
                 GetFullQueryLog.class,
-                GetTimeout.class,
-                GetStreamThroughput.class,
-                GetTraceProbability.class,
                 GetInterDCStreamThroughput.class,
-                GetEndpoints.class,
-                GetSeeds.class,
-                GetSSTables.class,
+                GetLoggingLevels.class,
                 GetMaxHintWindow.class,
+                GetSSTables.class,
+                GetSeeds.class,
+                GetSnapshotThrottle.class,
+                GetStreamThroughput.class,
+                GetTimeout.class,
+                GetTraceProbability.class,
                 GossipInfo.class,
                 Import.class,
+                Info.class,
                 InvalidateCounterCache.class,
                 InvalidateCredentialsCache.class,
                 InvalidateJmxPermissionsCache.class,
@@ -148,76 +158,66 @@ public class NodeTool
                 InvalidateRolesCache.class,
                 InvalidateRowCache.class,
                 Join.class,
+                ListSnapshots.class,
                 Move.class,
+                NetStats.class,
                 PauseHandoff.class,
-                ResumeHandoff.class,
                 ProfileLoad.class,
                 ProxyHistograms.class,
+                RangeKeySample.class,
                 Rebuild.class,
+                RebuildIndex.class,
+                RecompressSSTables.class,
                 Refresh.class,
-                RemoveNode.class,
-                Assassinate.class,
+                RefreshSizeEstimates.class,
+                ReloadLocalSchema.class,
                 ReloadSeeds.class,
-                ResetFullQueryLog.class,
+                ReloadSslCertificates.class,
+                ReloadTriggers.class,
+                RelocateSSTables.class,
+                RemoveNode.class,
                 Repair.class,
                 ReplayBatchlog.class,
-                SetCacheCapacity.class,
-                SetConcurrency.class,
-                SetHintedHandoffThrottleInKB.class,
+                ResetFullQueryLog.class,
+                ResetLocalSchema.class,
+                ResumeHandoff.class,
+                Ring.class,
+                Scrub.class,
                 SetBatchlogReplayThrottle.class,
+                SetCacheCapacity.class,
+                SetCacheKeysToSave.class,
                 SetCompactionThreshold.class,
                 SetCompactionThroughput.class,
-                GetConcurrentCompactors.class,
+                SetConcurrency.class,
                 SetConcurrentCompactors.class,
-                GetConcurrentViewBuilders.class,
                 SetConcurrentViewBuilders.class,
-                SetConcurrency.class,
-                SetTimeout.class,
-                SetStreamThroughput.class,
+                SetHintedHandoffThrottleInKB.class,
                 SetInterDCStreamThroughput.class,
-                SetTraceProbability.class,
+                SetLoggingLevel.class,
                 SetMaxHintWindow.class,
-                Snapshot.class,
-                ListSnapshots.class,
-                GetSnapshotThrottle.class,
                 SetSnapshotThrottle.class,
+                SetStreamThroughput.class,
+                SetTimeout.class,
+                SetTraceProbability.class,
+                Sjk.class,
+                Snapshot.class,
                 Status.class,
+                StatusAutoCompaction.class,
+                StatusBackup.class,
                 StatusBinary.class,
                 StatusGossip.class,
-                StatusBackup.class,
                 StatusHandoff.class,
-                StatusAutoCompaction.class,
                 Stop.class,
                 StopDaemon.class,
-                Version.class,
-                DescribeRing.class,
-                RebuildIndex.class,
-                RangeKeySample.class,
-                EnableBackup.class,
-                DisableBackup.class,
-                ResetLocalSchema.class,
-                ReloadLocalSchema.class,
-                ReloadTriggers.class,
-                SetCacheKeysToSave.class,
-                DisableHandoff.class,
-                Drain.class,
-                TruncateHints.class,
-                TpStats.class,
+                TableHistograms.class,
+                TableStats.class,
                 TopPartitions.class,
-                SetLoggingLevel.class,
-                GetLoggingLevels.class,
-                Sjk.class,
-                DisableHintsForDC.class,
-                EnableHintsForDC.class,
-                FailureDetectorInfo.class,
-                RefreshSizeEstimates.class,
-                RelocateSSTables.class,
-                ViewBuildStatus.class,
-                ReloadSslCertificates.class,
-                EnableAuditLog.class,
-                DisableAuditLog.class,
-                EnableOldProtocolVersions.class,
-                DisableOldProtocolVersions.class
+                TpStats.class,
+                TruncateHints.class,
+                UpgradeSSTable.class,
+                Verify.class,
+                Version.class,
+                ViewBuildStatus.class
         );
 
         Cli.CliBuilder<NodeToolCmdRunnable> builder = Cli.builder("nodetool");
diff --git a/src/java/org/apache/cassandra/tools/nodetool/UpgradeSSTable.java b/src/java/org/apache/cassandra/tools/nodetool/RecompressSSTables.java
similarity index 78%
copy from src/java/org/apache/cassandra/tools/nodetool/UpgradeSSTable.java
copy to src/java/org/apache/cassandra/tools/nodetool/RecompressSSTables.java
index ba1b6f5..78ed7ec 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/UpgradeSSTable.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/RecompressSSTables.java
@@ -27,15 +27,12 @@ import java.util.List;
 import org.apache.cassandra.tools.NodeProbe;
 import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
 
-@Command(name = "upgradesstables", description = "Rewrite sstables (for the requested tables) that are not on the current version (thus upgrading them to said current version)")
-public class UpgradeSSTable extends NodeToolCmd
+@Command(name = "recompress_sstables", description = "Rewrite sstables (for the requested tables) that have compression configuration different from the current")
+public class RecompressSSTables extends NodeToolCmd
 {
     @Arguments(usage = "[<keyspace> <tables>...]", description = "The keyspace followed by one or many tables")
     private List<String> args = new ArrayList<>();
 
-    @Option(title = "include_all", name = {"-a", "--include-all-sstables"}, description = "Use -a to include all sstables, even those already on the current version")
-    private boolean includeAll = false;
-
     @Option(title = "jobs",
             name = {"-j", "--jobs"},
             description = "Number of sstables to upgrade simultanously, set to 0 to use all available compaction threads")
@@ -51,7 +48,7 @@ public class UpgradeSSTable extends NodeToolCmd
         {
             try
             {
-                probe.upgradeSSTables(probe.output().out, keyspace, !includeAll, jobs, tableNames);
+                probe.recompressSSTables(probe.output().out, keyspace, jobs, tableNames);
             }
             catch (Exception e)
             {
@@ -59,4 +56,4 @@ public class UpgradeSSTable extends NodeToolCmd
             }
         }
     }
-}
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/tools/nodetool/UpgradeSSTable.java b/src/java/org/apache/cassandra/tools/nodetool/UpgradeSSTable.java
index ba1b6f5..cc94d8b 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/UpgradeSSTable.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/UpgradeSSTable.java
@@ -33,9 +33,16 @@ public class UpgradeSSTable extends NodeToolCmd
     @Arguments(usage = "[<keyspace> <tables>...]", description = "The keyspace followed by one or many tables")
     private List<String> args = new ArrayList<>();
 
-    @Option(title = "include_all", name = {"-a", "--include-all-sstables"}, description = "Use -a to include all sstables, even those already on the current version")
+    @Option(title = "include_all",
+            name = {"-a", "--include-all-sstables"},
+            description = "Use -a to include all sstables, even those already on the current version")
     private boolean includeAll = false;
 
+    @Option(title = "max_timestamp",
+            name = {"-t", "--max-timestamp"},
+            description = "Use -t to compact only SSTables that have local creation time _older_ than the given timestamp")
+    private long maxSSTableTimestamp = Long.MAX_VALUE;
+
     @Option(title = "jobs",
             name = {"-j", "--jobs"},
             description = "Number of sstables to upgrade simultanously, set to 0 to use all available compaction threads")
@@ -51,7 +58,7 @@ public class UpgradeSSTable extends NodeToolCmd
         {
             try
             {
-                probe.upgradeSSTables(probe.output().out, keyspace, !includeAll, jobs, tableNames);
+                probe.upgradeSSTables(probe.output().out, keyspace, !includeAll, maxSSTableTimestamp, jobs, tableNames);
             }
             catch (Exception e)
             {
diff --git a/test/distributed/org/apache/cassandra/distributed/test/UpgradeSSTablesTest.java b/test/distributed/org/apache/cassandra/distributed/test/UpgradeSSTablesTest.java
new file mode 100644
index 0000000..c599f17
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/UpgradeSSTablesTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.LogAction;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+
+public class UpgradeSSTablesTest extends TestBaseImpl
+{
+    @Test
+    public void rewriteSSTablesTest() throws Throwable
+    {
+        try (ICluster<IInvokableInstance> cluster = builder().withNodes(1).withDataDirCount(1).start())
+        {
+            for (String compressionBefore : new String[]{ "{'class' : 'LZ4Compressor', 'chunk_length_in_kb' : 32}", "{'enabled': 'false'}" })
+            {
+                for (String command : new String[]{ "upgradesstables", "recompress_sstables" })
+                {
+                    cluster.schemaChange(withKeyspace("DROP KEYSPACE IF EXISTS %s"));
+                    cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};"));
+
+                    cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck)) " +
+                                                      "WITH compression = " + compressionBefore));
+                    cluster.get(1).acceptsOnInstance((String ks) -> {
+                        Keyspace.open(ks).getColumnFamilyStore("tbl").disableAutoCompaction();
+                    }).accept(KEYSPACE);
+
+                    String blob = "blob";
+                    for (int i = 0; i < 6; i++)
+                        blob += blob;
+
+                    for (int i = 0; i < 100; i++)
+                    {
+                        cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (?,?,?)"),
+                                                       ConsistencyLevel.QUORUM, i, i, blob);
+                    }
+                    cluster.get(1).nodetool("flush", KEYSPACE, "tbl");
+
+                    Assert.assertEquals(0, cluster.get(1).nodetool("upgradesstables", "-a", KEYSPACE, "tbl"));
+                    cluster.schemaChange(withKeyspace("ALTER TABLE %s.tbl WITH compression = {'class' : 'LZ4Compressor', 'chunk_length_in_kb' : 128};"));
+
+                    Thread.sleep(2000); // Make sure timestamp will be different even with 1-second resolution.
+
+                    long maxSoFar = cluster.get(1).appliesOnInstance((String ks) -> {
+                        long maxTs = -1;
+                        ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore("tbl");
+                        cfs.disableAutoCompaction();
+                        for (SSTableReader tbl : cfs.getLiveSSTables())
+                        {
+                            maxTs = Math.max(maxTs, tbl.getCreationTimeFor(Component.DATA));
+                        }
+                        return maxTs;
+                    }).apply(KEYSPACE);
+
+                    for (int i = 100; i < 200; i++)
+                    {
+                        cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (?,?,?)"),
+                                                       ConsistencyLevel.QUORUM, i, i, blob);
+                    }
+                    cluster.get(1).nodetool("flush", KEYSPACE, "tbl");
+
+                    LogAction logAction = cluster.get(1).logs();
+                    logAction.mark();
+
+                    long expectedCount = cluster.get(1).appliesOnInstance((String ks, Long maxTs) -> {
+                        long count = 0;
+                        long skipped = 0;
+                        Set<SSTableReader> liveSSTables = Keyspace.open(ks).getColumnFamilyStore("tbl").getLiveSSTables();
+                        assert liveSSTables.size() == 2 : String.format("Expected 2 sstables, but got " + liveSSTables.size());
+                        for (SSTableReader tbl : liveSSTables)
+                        {
+                            if (tbl.getCreationTimeFor(Component.DATA) <= maxTs)
+                                count++;
+                            else
+                                skipped++;
+                        }
+                        assert skipped > 0;
+                        return count;
+                    }).apply(KEYSPACE, maxSoFar);
+
+                    if (command.equals("upgradesstables"))
+                        Assert.assertEquals(0, cluster.get(1).nodetool("upgradesstables", "-a", "-t", Long.toString(maxSoFar), KEYSPACE, "tbl"));
+                    else
+                        Assert.assertEquals(0, cluster.get(1).nodetool("recompress_sstables", KEYSPACE, "tbl"));
+
+                    Assert.assertFalse(logAction.grep(String.format("%d sstables to", expectedCount)).getResult().isEmpty());
+                }
+            }
+        }
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org