You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dj...@apache.org on 2020/05/06 20:33:16 UTC

[cassandra] branch trunk updated: bin/sstableverify should support user provided token ranges

This is an automated email from the ASF dual-hosted git repository.

djoshi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2d27c76  bin/sstableverify should support user provided token ranges
2d27c76 is described below

commit 2d27c769479f877eb3af308b49d73f8e124f4703
Author: David Capwell <dc...@apple.com>
AuthorDate: Wed Apr 22 11:13:24 2020 -0700

    bin/sstableverify should support user provided token ranges
    
    Patch by David Capwell; reviewed by Dinesh Joshi for CASSANDRA-15753
---
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/db/compaction/Verifier.java   | 36 +++++++++++-----
 .../org/apache/cassandra/tools/BulkLoader.java     | 17 ++++++++
 .../apache/cassandra/tools/StandaloneVerifier.java | 49 +++++++++++++++++++++-
 4 files changed, 92 insertions(+), 11 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 4b13ec5..ddf2d63 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha5
+ * bin/sstableverify should support user provided token ranges (CASSANDRA-15753)
  * Improve logging when mutation passed to commit log is too large (CASSANDRA-14781)
  * replace LZ4FastDecompressor with LZ4SafeDecompressor (CASSANDRA-15560)
  * Fix buffer pool NPE with concurrent release due to in-progress tiny pool eviction (CASSANDRA-15726)
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index 551043a..2500a24 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -59,8 +59,8 @@ import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.*;
+import java.util.function.Function;
 import java.util.function.LongPredicate;
-import java.util.function.Predicate;
 
 public class Verifier implements Closeable
 {
@@ -76,6 +76,12 @@ public class Verifier implements Closeable
     private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
     private final Options options;
     private final boolean isOffline;
+    /**
+     * Given a keyspace, return the set of local and pending token ranges.  By default {@link StorageService#getLocalAndPendingRanges(String)}
+     * is expected, but for the standalone verifier case we can't use that, so this is here to allow the CLI to provide
+     * the token ranges.
+     */
+    private final Function<String, ? extends Collection<Range<Token>>> tokenLookup;
 
     private int goodRows;
 
@@ -103,6 +109,7 @@ public class Verifier implements Closeable
         this.verifyInfo = new VerifyInfo(dataFile, sstable);
         this.options = options;
         this.isOffline = isOffline;
+        this.tokenLookup = options.tokenLookup;
     }
 
     public void verify()
@@ -130,7 +137,7 @@ public class Verifier implements Closeable
         }
         catch (Throwable t)
         {
-            outputHandler.debug(t.getMessage());
+            outputHandler.warn(t.getMessage());
             markAndThrow(false);
         }
 
@@ -141,7 +148,7 @@ public class Verifier implements Closeable
         }
         catch (Throwable t)
         {
-            outputHandler.debug(t.getMessage());
+            outputHandler.warn(t.getMessage());
             markAndThrow();
         }
 
@@ -153,7 +160,7 @@ public class Verifier implements Closeable
         catch (Throwable t)
         {
             outputHandler.output("Index summary is corrupt - if it is removed it will get rebuilt on startup "+sstable.descriptor.filenameFor(Component.SUMMARY));
-            outputHandler.debug(t.getMessage());
+            outputHandler.warn(t.getMessage());
             markAndThrow(false);
         }
 
@@ -165,7 +172,7 @@ public class Verifier implements Closeable
         }
         catch (Throwable t)
         {
-            outputHandler.debug(t.getMessage());
+            outputHandler.warn(t.getMessage());
             markAndThrow();
         }
 
@@ -174,7 +181,7 @@ public class Verifier implements Closeable
             outputHandler.debug("Checking that all tokens are owned by the current node");
             try (KeyIterator iter = new KeyIterator(sstable.descriptor, sstable.metadata()))
             {
-                List<Range<Token>> ownedRanges = Range.normalize(StorageService.instance.getLocalAndPendingRanges(cfs.metadata.keyspace));
+                List<Range<Token>> ownedRanges = Range.normalize(tokenLookup.apply(cfs.metadata.keyspace));
                 if (ownedRanges.isEmpty())
                     return;
                 RangeOwnHelper rangeOwnHelper = new RangeOwnHelper(ownedRanges);
@@ -213,7 +220,7 @@ public class Verifier implements Closeable
         }
         catch (IOException e)
         {
-            outputHandler.debug(e.getMessage());
+            outputHandler.warn(e.getMessage());
             markAndThrow();
         }
         finally
@@ -235,7 +242,7 @@ public class Verifier implements Closeable
                     markAndThrow();
             }
 
-            List<Range<Token>> ownedRanges = isOffline ? Collections.emptyList() : Range.normalize(StorageService.instance.getLocalAndPendingRanges(cfs.metadata().keyspace));
+            List<Range<Token>> ownedRanges = isOffline ? Collections.emptyList() : Range.normalize(tokenLookup.apply(cfs.metadata().keyspace));
             RangeOwnHelper rangeOwnHelper = new RangeOwnHelper(ownedRanges);
             DecoratedKey prevKey = null;
 
@@ -543,8 +550,9 @@ public class Verifier implements Closeable
         public final boolean mutateRepairStatus;
         public final boolean checkOwnsTokens;
         public final boolean quick;
+        public final Function<String, ? extends Collection<Range<Token>>> tokenLookup;
 
-        private Options(boolean invokeDiskFailurePolicy, boolean extendedVerification, boolean checkVersion, boolean mutateRepairStatus, boolean checkOwnsTokens, boolean quick)
+        private Options(boolean invokeDiskFailurePolicy, boolean extendedVerification, boolean checkVersion, boolean mutateRepairStatus, boolean checkOwnsTokens, boolean quick, Function<String, ? extends Collection<Range<Token>>> tokenLookup)
         {
             this.invokeDiskFailurePolicy = invokeDiskFailurePolicy;
             this.extendedVerification = extendedVerification;
@@ -552,6 +560,7 @@ public class Verifier implements Closeable
             this.mutateRepairStatus = mutateRepairStatus;
             this.checkOwnsTokens = checkOwnsTokens;
             this.quick = quick;
+            this.tokenLookup = tokenLookup;
         }
 
         @Override
@@ -575,6 +584,7 @@ public class Verifier implements Closeable
             private boolean mutateRepairStatus = false; // mutating repair status can be dangerous
             private boolean checkOwnsTokens = false;
             private boolean quick = false;
+            private Function<String, ? extends Collection<Range<Token>>> tokenLookup = StorageService.instance::getLocalAndPendingRanges;
 
             public Builder invokeDiskFailurePolicy(boolean param)
             {
@@ -612,9 +622,15 @@ public class Verifier implements Closeable
                 return this;
             }
 
+            public Builder tokenLookup(Function<String, ? extends Collection<Range<Token>>> tokenLookup)
+            {
+                this.tokenLookup = tokenLookup;
+                return this;
+            }
+
             public Options build()
             {
-                return new Options(invokeDiskFailurePolicy, extendedVerification, checkVersion, mutateRepairStatus, checkOwnsTokens, quick);
+                return new Options(invokeDiskFailurePolicy, extendedVerification, checkVersion, mutateRepairStatus, checkOwnsTokens, quick, tokenLookup);
             }
 
         }
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index 42c2bf3..07b534c 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -310,6 +310,23 @@ public class BulkLoader
         }
 
         /**
+         * Add option with argument and argument name that accepts being defined multiple times as a list
+         * @param opt shortcut for option name
+         * @param longOpt complete option name
+         * @param argName argument name
+         * @param description description of the option
+         * @return updated Options object
+         */
+        public Options addOptionList(String opt, String longOpt, String argName, String description)
+        {
+            Option option = new Option(opt, longOpt, true, description);
+            option.setArgName(argName);
+            option.setArgs(Option.UNLIMITED_VALUES);
+
+            return addOption(option);
+        }
+
+        /**
          * Add option without argument
          * @param opt shortcut for option name
          * @param longOpt complete option name
diff --git a/src/java/org/apache/cassandra/tools/StandaloneVerifier.java b/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
index 81e992e..9074418 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
@@ -23,6 +23,9 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.compaction.*;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.Descriptor;
@@ -33,6 +36,8 @@ import org.apache.commons.cli.*;
 
 import java.util.*;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.cassandra.tools.BulkLoader.CmdLineOptions;
 
@@ -46,11 +51,13 @@ public class StandaloneVerifier
     private static final String CHECK_VERSION = "check_version";
     private static final String MUTATE_REPAIR_STATUS = "mutate_repair_status";
     private static final String QUICK = "quick";
+    private static final String TOKEN_RANGE = "token_range";
 
     public static void main(String args[])
     {
         Options options = Options.parseArgs(args);
         Util.initDatabaseDescriptor();
+        System.out.println("sstableverify using the following options: " + options);
 
         try
         {
@@ -97,8 +104,10 @@ public class StandaloneVerifier
                                                                .extendedVerification(options.extended)
                                                                .checkVersion(options.checkVersion)
                                                                .mutateRepairStatus(options.mutateRepairStatus)
-                                                               .checkOwnsTokens(false) // don't know the ranges when running offline
+                                                               .checkOwnsTokens(!options.tokens.isEmpty())
+                                                               .tokenLookup(ignore -> options.tokens)
                                                                .build();
+            handler.output("Running verifier with the following options: " + verifyOptions);
             for (SSTableReader sstable : sstables)
             {
                 try
@@ -145,6 +154,7 @@ public class StandaloneVerifier
         public boolean checkVersion;
         public boolean mutateRepairStatus;
         public boolean quick;
+        public Collection<Range<Token>> tokens;
 
         private Options(String keyspaceName, String cfName)
         {
@@ -187,6 +197,17 @@ public class StandaloneVerifier
                 opts.mutateRepairStatus = cmd.hasOption(MUTATE_REPAIR_STATUS);
                 opts.quick = cmd.hasOption(QUICK);
 
+                if (cmd.hasOption(TOKEN_RANGE))
+                {
+                    opts.tokens = Stream.of(cmd.getOptionValues(TOKEN_RANGE))
+                                        .map(StandaloneVerifier::parseTokenRange)
+                                        .collect(Collectors.toSet());
+                }
+                else
+                {
+                    opts.tokens = Collections.emptyList();
+                }
+
                 return opts;
             }
             catch (ParseException e)
@@ -196,6 +217,21 @@ public class StandaloneVerifier
             }
         }
 
+        public String toString()
+        {
+            return "Options{" +
+                   "keyspaceName='" + keyspaceName + '\'' +
+                   ", cfName='" + cfName + '\'' +
+                   ", debug=" + debug +
+                   ", verbose=" + verbose +
+                   ", extended=" + extended +
+                   ", checkVersion=" + checkVersion +
+                   ", mutateRepairStatus=" + mutateRepairStatus +
+                   ", quick=" + quick +
+                   ", tokens=" + tokens +
+                   '}';
+        }
+
         private static void errorMsg(String msg, CmdLineOptions options)
         {
             System.err.println(msg);
@@ -213,6 +249,7 @@ public class StandaloneVerifier
             options.addOption("c",  CHECK_VERSION,         "make sure sstables are the latest version");
             options.addOption("r",  MUTATE_REPAIR_STATUS,  "don't mutate repair status");
             options.addOption("q",  QUICK,                 "do a quick check, don't read all data");
+            options.addOptionList("t", TOKEN_RANGE, "range", "long token range of the format left,right. This may be provided multiple times to define multiple different ranges");
             return options;
         }
 
@@ -227,4 +264,14 @@ public class StandaloneVerifier
             new HelpFormatter().printHelp(usage, header.toString(), options, "");
         }
     }
+
+    private static Range<Token> parseTokenRange(String line)
+    {
+        String[] split = line.split(",");
+        if (split.length != 2)
+            throw new IllegalArgumentException("Unable to parse token range from " + line + "; format is left,right but saw " + split.length + " parts");
+        long left = Long.parseLong(split[0]);
+        long right = Long.parseLong(split[1]);
+        return new Range<>(new Murmur3Partitioner.LongToken(left), new Murmur3Partitioner.LongToken(right));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org