You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2014/01/10 19:40:22 UTC

[1/3] refactor nodetool, encapsulating each command into a subclass patch by Clément Lardeur; reviewed by Mikhail Stepura for CASSANDRA-6381

Updated Branches:
  refs/heads/trunk eca02fd25 -> 892d8e699


http://git-wip-us.apache.org/repos/asf/cassandra/blob/892d8e69/src/java/org/apache/cassandra/tools/NodeToolHelp.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeToolHelp.java b/src/java/org/apache/cassandra/tools/NodeToolHelp.java
deleted file mode 100644
index c89e48c..0000000
--- a/src/java/org/apache/cassandra/tools/NodeToolHelp.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.tools;
-
-import java.util.List;
-
-public class NodeToolHelp
-{
-    public List<NodeToolCommand> commands;
-
-    public static class NodeToolCommand
-    {
-        public String name;
-        public String help;
-
-        public String toString()
-        {
-            return name;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/892d8e69/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
----------------------------------------------------------------------
diff --git a/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml b/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
deleted file mode 100644
index 42fda0d..0000000
--- a/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
+++ /dev/null
@@ -1,217 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Help file for nodetool commands in Yaml.
-commands:
-  - name: ring
-    help: |
-      Print information about the token ring
-  - name: join
-    help: |
-      Join the ring
-  - name: info [-T/--tokens]
-    help: |
-      Print node information (uptime, load, ...)
-  - name: status
-    help: |
-      Print cluster information (state, load, IDs, ...)
-  - name: cfstats [keyspace].[cfname] ...
-    help: |
-      Print statistics on column families. Use the -i flag to ignore the list of column families and display the remaining cfs.
-  - name: version
-    help: |
-      Print cassandra version
-  - name: tpstats
-    help: |
-      Print usage statistics of thread pools
-  - name: proxyhistograms
-    help: |
-      Print statistic histograms for network operations
-  - name: drain
-    help: |
-      Drain the node (stop accepting writes and flush all column families)
-  - name: decommission
-    help: |
-      Decommission the *node I am connecting to*
-  - name: compactionstats
-    help: |
-      Print statistics on compactions
-  - name: compactionhistory
-    help: |
-      Print history of compaction
-  - name: disablebinary
-    help: |
-      Disable native transport (binary protocol)
-  - name: enablebinary
-    help: |
-      Reenable native transport (binary protocol)
-  - name: statusbinary
-    help: |
-      Status of native transport (binary protocol)
-  - name: disablehandoff
-    help: |
-      Disable the future hints storing on the current node
-  - name: enablehandoff
-    help: |
-      Reenable the future hints storing on the current node
-  - name: truncatehints <host-name>
-    help: |
-        Truncate all hints on the local node, or truncate hints for the endpoint specified.
-  - name: resumehandoff
-    help: |
-      Resume hints delivery process
-  - name: pausehandoff
-    help: |
-      Pause hints delivery process
-  - name: disablegossip
-    help: |
-      Disable gossip (effectively marking the node down)
-  - name: enablegossip
-    help: |
-      Reenable gossip
-  - name: disablethrift
-    help: |
-      Disable thrift server
-  - name: enablethrift
-    help: |
-      Reenable thrift server
-  - name: enablebackup
-    help: |
-      Enable incremental backup
-  - name: disablebackup
-    help: |
-      Disable incremental backup
-  - name: statusthrift
-    help: |
-      Status of thrift server
-  - name: gossipinfo
-    help: |
-      Shows the gossip information for the cluster
-  - name: invalidatekeycache
-    help: |
-      Invalidate the key cache
-  - name: invalidaterowcache
-    help: |
-      Invalidate the row cache
-  - name: resetlocalschema
-    help: |
-      Reset node's local schema and resync
-  - name: netstats [host]
-    help: |
-      Print network information on provided host (connecting node by default)
-  - name: move <new token>
-    help: |
-      Move node on the token ring to a new token. (for negative tokens, use \\ to escape, Example: move \\-123)
-  - name: removenode status|force|<ID>
-    help: |
-      Show status of current node removal, force completion of pending removal or remove provided ID
-  - name: setcompactionthroughput <value_in_mb>
-    help: |
-      Set the MB/s throughput cap for compaction in the system, or 0 to disable throttling.
-  - name: setstreamthroughput  <value_in_mb>
-    help: |
-      Set the MB/s throughput cap for streaming in the system, or 0 to disable throttling.
-  - name: describecluster
-    help: |
-      Print the name, snitch, partitioner and schema version of a cluster.
-  - name: describering [keyspace]
-    help: |
-      Shows the token ranges info of a given keyspace.
-  - name: rangekeysample
-    help: |
-      Shows the sampled keys held across all keyspaces.
-  - name: rebuild [src-dc-name]
-    help: |
-      Rebuild data by streaming from other nodes (similarly to bootstrap)
-  - name: settraceprobability [value]
-    help: |
-      Sets the probability for tracing any given request to value. 0 disables, 1 enables for all requests, 0 is the default
-  - name: snapshot [keyspaces...] -cf [columnfamilyName] -t [snapshotName]
-    help: |
-      Take a snapshot of the optionally specified column family of the specified keyspaces  using optional name snapshotName
-  - name: clearsnapshot [keyspaces...] -t [snapshotName]
-    help: |
-      Remove snapshots for the specified keyspaces. Either remove all snapshots or remove the snapshots with the given name.
-  - name: flush [keyspace] [cfnames]
-    help: |
-      Flush one or more column families
-  - name: repair [keyspace] [cfnames]
-    help: |
-      Repair one or more column families
-         Use -dc to repair specific datacenters (csv list).
-         Use -et to specify a token at which repair range ends.
-         Use -local to only repair against nodes in the same datacenter.
-         Use -pr to repair only the first range returned by the partitioner.
-         Use -par to carry out a parallel repair.
-         Use -st to specify a token at which the repair range starts.
-  - name: cleanup [keyspace] [cfnames]
-    help: |
-      Run cleanup on one or more column families
-  - name: compact [keyspace] [cfnames]
-    help: |
-      Force a (major) compaction on one or more column families
-  - name: scrub [keyspace] [cfnames]
-    help: |
-      Scrub (rebuild sstables for) one or more column families
-  - name: upgradesstables [-a|--include-all-sstables] [keyspace] [cfnames]
-    help: |
-      Rewrite sstables (for the requested column families) that are not on the current version (thus upgrading them to said current version).
-         Use -a to include all sstables, even those already on the current version.
-  - name: setcompactionthreshold <keyspace> <cfname>
-    help: |
-      Set min and max compaction thresholds for a given column family
-  - name: getcompactionthreshold <keyspace> <cfname>
-    help: |
-      Print min and max compaction thresholds for a given column family
-  - name: disableautocompaction [keyspace] [cfnames]
-    help: |
-      Disable autocompaction for the given keyspace and column family
-  - name: enableautocompaction [keyspace] [cfnames]
-    help: |
-      Enable autocompaction
-  - name: getcompactionthroughput
-    help: |
-      Print the MB/s throughput cap for compaction in the system
-  - name: getstreamthroughput
-    help: |
-      Print the MB/s throughput cap for streaming in the system
-  - name: stop <compaction_type>
-    help: |
-      Supported types are COMPACTION, VALIDATION, CLEANUP, SCRUB, INDEX_BUILD
-  - name: cfhistograms <keyspace> <cfname>
-    help: |
-      Print statistic histograms for a given column family
-  - name: refresh <keyspace> <cf-name>
-    help: |
-      Load newly placed SSTables to the system without restart.
-  - name: rebuild_index <keyspace> <cf-name> <idx1,idx1>
-    help: |
-      a full rebuilds of native secondry index for a given column family. IndexNameExample: Standard3.IdxName,Standard3.IdxName1
-  - name: setcachecapacity <key-cache-capacity> <row-cache-capacity>
-    help: |
-      Set global key and row cache capacities (in MB units).
-  - name: setcachekeystosave <key-cache-keys-to-save> <row-cache-keys-to-save>
-    help: |
-      Set number of keys saved by each cache for faster post-restart warmup. 0 to disable.
-  - name: getendpoints <keyspace> <cf> <key>
-    help: |
-      Print the end points that owns the key
-  - name: getsstables <keyspace> <cf> <key>
-    help: |
-      Print the sstable filenames that own the key
-  - name: reloadtriggers
-    help: |
-      reload trigger classes


[2/3] refactor nodetool, encapsulating each command into a subclass patch by Clément Lardeur; reviewed by Mikhail Stepura for CASSANDRA-6381

Posted by jb...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/892d8e69/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
new file mode 100644
index 0000000..c9ddf4f
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -0,0 +1,2144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.tools;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.lang.management.MemoryUsage;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.text.DecimalFormat;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import javax.management.openmbean.TabularData;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Throwables;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.Maps;
+
+import com.yammer.metrics.reporting.JmxReporter;
+import io.airlift.command.*;
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStoreMBean;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionManagerMBean;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
+import org.apache.cassandra.net.MessagingServiceMBean;
+import org.apache.cassandra.service.CacheServiceMBean;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.SessionInfo;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.utils.EstimatedHistogram;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Throwables.getStackTraceAsString;
+import static com.google.common.collect.Iterables.toArray;
+import static com.google.common.collect.Lists.newArrayList;
+import static java.lang.Integer.parseInt;
+import static java.lang.String.format;
+import static org.apache.commons.lang3.ArrayUtils.EMPTY_STRING_ARRAY;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.commons.lang3.StringUtils.join;
+
+public class NodeTool
+{
+    private static final String HISTORYFILE = "nodetool.history";
+
+    public static void main(String... args)
+    {
+        List<Class<? extends Runnable>> commands = newArrayList(
+                Help.class,
+                Info.class,
+                Ring.class,
+                NetStats.class,
+                CfStats.class,
+                CfHistograms.class,
+                Cleanup.class,
+                ClearSnapshot.class,
+                Compact.class,
+                Scrub.class,
+                Flush.class,
+                UpgradeSSTable.class,
+                DisableAutoCompaction.class,
+                EnableAutoCompaction.class,
+                CompactionStats.class,
+                CompactionHistory.class,
+                Decommission.class,
+                DescribeCluster.class,
+                DisableBinary.class,
+                EnableBinary.class,
+                EnableGossip.class,
+                DisableGossip.class,
+                EnableHandoff.class,
+                EnableThrift.class,
+                GetCompactionThreshold.class,
+                GetCompactionThroughput.class,
+                GetStreamThroughput.class,
+                GetEndpoints.class,
+                GetSSTables.class,
+                GossipInfo.class,
+                InvalidateKeyCache.class,
+                InvalidateRowCache.class,
+                Join.class,
+                Move.class,
+                PauseHandoff.class,
+                ResumeHandoff.class,
+                ProxyHistograms.class,
+                Rebuild.class,
+                Refresh.class,
+                RemoveToken.class,
+                RemoveNode.class,
+                Repair.class,
+                SetCacheCapacity.class,
+                SetCompactionThreshold.class,
+                SetCompactionThroughput.class,
+                SetStreamThroughput.class,
+                SetTraceProbability.class,
+                Snapshot.class,
+                Status.class,
+                StatusBinary.class,
+                StatusThrift.class,
+                Stop.class,
+                StopDaemon.class,
+                Version.class,
+                DescribeRing.class,
+                RebuildIndex.class,
+                RangeKeySample.class,
+                EnableBackup.class,
+                DisableBackup.class,
+                ResetLocalSchema.class,
+                ReloadTriggers.class,
+                SetCacheKeysToSave.class,
+                DisableThrift.class,
+                DisableHandoff.class,
+                Drain.class,
+                TruncateHints.class,
+                TpStats.class
+        );
+
+        Cli<Runnable> parser = Cli.<Runnable>builder("nodetool")
+                .withDescription("Manage your Cassandra cluster")
+                .withDefaultCommand(Help.class)
+                .withCommands(commands)
+                .build();
+
+        int status = 0;
+        try
+        {
+            Runnable parse = parser.parse(args);
+            printHistory(args);
+            parse.run();
+        } catch (IllegalArgumentException |
+                IllegalStateException |
+                ParseArgumentsMissingException |
+                ParseArgumentsUnexpectedException |
+                ParseOptionConversionException |
+                ParseOptionMissingException |
+                ParseOptionMissingValueException |
+                ParseCommandMissingException |
+                ParseCommandUnrecognizedException e)
+        {
+            badUse(e);
+            status = 1;
+        } catch (Throwable throwable)
+        {
+            err(Throwables.getRootCause(throwable));
+            status = 2;
+        }
+
+        System.exit(status);
+    }
+
+    private static void printHistory(String... args)
+    {
+        //don't bother to print if no args passed (meaning, nodetool is just printing out the sub-commands list)
+        if (args.length == 0)
+            return;
+
+        String cmdLine = Joiner.on(" ").skipNulls().join(args);
+        cmdLine = cmdLine.replaceFirst("(?<=(-pw|--password))\\s+\\S+", " <hidden>");
+
+        try (FileWriter writer = new FileWriter(new File(FBUtilities.getToolsOutputDirectory(), HISTORYFILE), true))
+        {
+            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+            writer.append(sdf.format(new Date())).append(": ").append(cmdLine).append(System.lineSeparator());
+        }
+        catch (IOException ioe)
+        {
+            //quietly ignore any errors about not being able to write out history
+        }
+    }
+
+    private static void badUse(Exception e)
+    {
+        System.out.println("nodetool: " + e.getMessage());
+        System.out.println("See 'nodetool help' or 'nodetool help <command>'.");
+    }
+
+    private static void err(Throwable e)
+    {
+        System.err.println("error: " + e.getMessage());
+        System.err.println("-- StackTrace --");
+        System.err.println(getStackTraceAsString(e));
+    }
+
+    public static abstract class NodeToolCmd implements Runnable
+    {
+
+        @Option(type = OptionType.GLOBAL, name = {"-h", "--host"}, description = "Node hostname or ip address")
+        private String host = "127.0.0.1";
+
+        @Option(type = OptionType.GLOBAL, name = {"-p", "--port"}, description = "Remote jmx agent port number")
+        private String port = "7199";
+
+        @Option(type = OptionType.GLOBAL, name = {"-u", "--username"}, description = "Remote jmx agent username")
+        private String username = EMPTY;
+
+        @Option(type = OptionType.GLOBAL, name = {"-pw", "--password"}, description = "Remote jmx agent password")
+        private String password = EMPTY;
+
+        @Override
+        public void run()
+        {
+            try (NodeProbe probe = connect())
+            {
+                execute(probe);
+            } catch (IOException e)
+            {
+                throw new RuntimeException("Error while closing JMX connection", e);
+            }
+
+        }
+
+        protected abstract void execute(NodeProbe probe);
+
+        private NodeProbe connect()
+        {
+            NodeProbe nodeClient = null;
+
+            try
+            {
+                if (username.isEmpty())
+                    nodeClient = new NodeProbe(host, parseInt(port));
+                else
+                    nodeClient = new NodeProbe(host, parseInt(port), username, password);
+            } catch (IOException e)
+            {
+                Throwable rootCause = Throwables.getRootCause(e);
+                System.err.println(format("nodetool: Failed to connect to '%s:%s' - %s: '%s'.", host, port, rootCause.getClass().getSimpleName(), rootCause.getMessage()));
+                System.exit(1);
+            }
+
+            return nodeClient;
+        }
+
+        protected List<String> parseOptionalKeyspace(List<String> cmdArgs, NodeProbe nodeProbe)
+        {
+            List<String> keyspaces = new ArrayList<>();
+
+            if (cmdArgs == null || cmdArgs.isEmpty())
+                keyspaces.addAll(nodeProbe.getKeyspaces());
+            else
+                keyspaces.add(cmdArgs.get(0));
+
+            for (String keyspace : keyspaces)
+            {
+                if (!nodeProbe.getKeyspaces().contains(keyspace))
+                    throw new IllegalArgumentException("Keyspace [" + keyspace + "] does not exist.");
+            }
+
+            return Collections.unmodifiableList(keyspaces);
+        }
+
+        protected String[] parseOptionalColumnFamilies(List<String> cmdArgs)
+        {
+            return cmdArgs.size() <= 1 ? EMPTY_STRING_ARRAY : toArray(cmdArgs.subList(1, cmdArgs.size()), String.class);
+        }
+    }
+
+    @Command(name = "info", description = "Print node information (uptime, load, ...)")
+    public static class Info extends NodeToolCmd
+    {
+        @Option(name = {"-T", "--tokens"}, description = "Display all tokens")
+        private boolean tokens = false;
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            boolean gossipInitialized = probe.isInitialized();
+
+            System.out.printf("%-17s: %s%n", "ID", probe.getLocalHostId());
+            System.out.printf("%-17s: %s%n", "Gossip active", gossipInitialized);
+            System.out.printf("%-17s: %s%n", "Thrift active", probe.isThriftServerRunning());
+            System.out.printf("%-17s: %s%n", "Native Transport active", probe.isNativeTransportRunning());
+            System.out.printf("%-17s: %s%n", "Load", probe.getLoadString());
+            if (gossipInitialized)
+                System.out.printf("%-17s: %s%n", "Generation No", probe.getCurrentGenerationNumber());
+            else
+                System.out.printf("%-17s: %s%n", "Generation No", 0);
+
+            // Uptime
+            long secondsUp = probe.getUptime() / 1000;
+            System.out.printf("%-17s: %d%n", "Uptime (seconds)", secondsUp);
+
+            // Memory usage
+            MemoryUsage heapUsage = probe.getHeapMemoryUsage();
+            double memUsed = (double) heapUsage.getUsed() / (1024 * 1024);
+            double memMax = (double) heapUsage.getMax() / (1024 * 1024);
+            System.out.printf("%-17s: %.2f / %.2f%n", "Heap Memory (MB)", memUsed, memMax);
+
+            // Data Center/Rack
+            System.out.printf("%-17s: %s%n", "Data Center", probe.getDataCenter());
+            System.out.printf("%-17s: %s%n", "Rack", probe.getRack());
+
+            // Exceptions
+            System.out.printf("%-17s: %s%n", "Exceptions", probe.getStorageMetric("Exceptions"));
+
+            CacheServiceMBean cacheService = probe.getCacheServiceMBean();
+
+            // Key Cache: Hits, Requests, RecentHitRate, SavePeriodInSeconds
+            System.out.printf("%-17s: entries %d, size %d (bytes), capacity %d (bytes), %d hits, %d requests, %.3f recent hit rate, %d save period in seconds%n",
+                    "Key Cache",
+                    probe.getCacheMetric("KeyCache", "Entries"),
+                    probe.getCacheMetric("KeyCache", "Size"),
+                    probe.getCacheMetric("KeyCache", "Capacity"),
+                    probe.getCacheMetric("KeyCache", "Hits"),
+                    probe.getCacheMetric("KeyCache", "Requests"),
+                    probe.getCacheMetric("KeyCache", "HitRate"),
+                    cacheService.getKeyCacheSavePeriodInSeconds());
+
+            // Row Cache: Hits, Requests, RecentHitRate, SavePeriodInSeconds
+            System.out.printf("%-17s: entries %d, size %d (bytes), capacity %d (bytes), %d hits, %d requests, %.3f recent hit rate, %d save period in seconds%n",
+                    "Row Cache",
+                    probe.getCacheMetric("RowCache", "Entries"),
+                    probe.getCacheMetric("RowCache", "Size"),
+                    probe.getCacheMetric("RowCache", "Capacity"),
+                    probe.getCacheMetric("RowCache", "Hits"),
+                    probe.getCacheMetric("RowCache", "Requests"),
+                    probe.getCacheMetric("RowCache", "HitRate"),
+                    cacheService.getRowCacheSavePeriodInSeconds());
+
+            // Tokens
+            List<String> tokens = probe.getTokens();
+            if (tokens.size() == 1 || this.tokens)
+                for (String token : tokens)
+                    System.out.printf("%-17s: %s%n", "Token", token);
+            else
+                System.out.printf("%-17s: (invoke with -T/--tokens to see all %d tokens)%n", "Token", tokens.size());
+        }
+    }
+
+    @Command(name = "ring", description = "Print information about the token ring")
+    public static class Ring extends NodeToolCmd
+    {
+        @Arguments(description = "Specify a keyspace for accurate ownership information (topology awareness)")
+        private String keyspace = null;
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            Map<String, String> tokensToEndpoints = probe.getTokenToEndpointMap();
+            LinkedHashMultimap<String, String> endpointsToTokens = LinkedHashMultimap.create();
+            for (Map.Entry<String, String> entry : tokensToEndpoints.entrySet())
+                endpointsToTokens.put(entry.getValue(), entry.getKey());
+
+            int maxAddressLength = Collections.max(endpointsToTokens.keys(), new Comparator<String>()
+            {
+                @Override
+                public int compare(String first, String second)
+                {
+                    return ((Integer) first.length()).compareTo(second.length());
+                }
+            }).length();
+
+            String formatPlaceholder = "%%-%ds  %%-12s%%-7s%%-8s%%-16s%%-20s%%-44s%%n";
+            String format = format(formatPlaceholder, maxAddressLength);
+
+            // Calculate per-token ownership of the ring
+            Map<InetAddress, Float> ownerships;
+            try
+            {
+                ownerships = probe.effectiveOwnership(keyspace);
+            } catch (IllegalStateException ex)
+            {
+                ownerships = probe.getOwnership();
+                System.out.printf("Note: Ownership information does not include topology; for complete information, specify a keyspace%n");
+            }
+            try
+            {
+                System.out.println();
+                Map<String, Map<InetAddress, Float>> perDcOwnerships = Maps.newLinkedHashMap();
+                // get the different datasets and map to tokens
+                for (Map.Entry<InetAddress, Float> ownership : ownerships.entrySet())
+                {
+                    String dc = probe.getEndpointSnitchInfoProxy().getDatacenter(ownership.getKey().getHostAddress());
+                    if (!perDcOwnerships.containsKey(dc))
+                        perDcOwnerships.put(dc, new LinkedHashMap<InetAddress, Float>());
+                    perDcOwnerships.get(dc).put(ownership.getKey(), ownership.getValue());
+                }
+                for (Map.Entry<String, Map<InetAddress, Float>> entry : perDcOwnerships.entrySet())
+                    printDc(probe, format, entry.getKey(), endpointsToTokens, entry.getValue());
+            } catch (UnknownHostException e)
+            {
+                throw new RuntimeException(e);
+            }
+
+            if (DatabaseDescriptor.getNumTokens() > 1)
+            {
+                System.out.println("  Warning: \"nodetool ring\" is used to output all the tokens of a node.");
+                System.out.println("  To view status related info of a node use \"nodetool status\" instead.\n");
+            }
+        }
+
+        private void printDc(NodeProbe probe, String format,
+                             String dc,
+                             LinkedHashMultimap<String, String> endpointsToTokens,
+                             Map<InetAddress, Float> filteredOwnerships)
+        {
+            Collection<String> liveNodes = probe.getLiveNodes();
+            Collection<String> deadNodes = probe.getUnreachableNodes();
+            Collection<String> joiningNodes = probe.getJoiningNodes();
+            Collection<String> leavingNodes = probe.getLeavingNodes();
+            Collection<String> movingNodes = probe.getMovingNodes();
+            Map<String, String> loadMap = probe.getLoadMap();
+
+            System.out.println("Datacenter: " + dc);
+            System.out.println("==========");
+
+            // get the total amount of replicas for this dc and the last token in this dc's ring
+            List<String> tokens = new ArrayList<>();
+            String lastToken = "";
+
+            for (Map.Entry<InetAddress, Float> entry : filteredOwnerships.entrySet())
+            {
+                tokens.addAll(endpointsToTokens.get(entry.getKey().getHostAddress()));
+                lastToken = tokens.get(tokens.size() - 1);
+            }
+
+
+            System.out.printf(format, "Address", "Rack", "Status", "State", "Load", "Owns", "Token");
+
+            if (filteredOwnerships.size() > 1)
+                System.out.printf(format, "", "", "", "", "", "", lastToken);
+            else
+                System.out.println();
+
+            for (Map.Entry<InetAddress, Float> entry : filteredOwnerships.entrySet())
+            {
+                String endpoint = entry.getKey().getHostAddress();
+                for (String token : endpointsToTokens.get(endpoint))
+                {
+                    String rack;
+                    try
+                    {
+                        rack = probe.getEndpointSnitchInfoProxy().getRack(endpoint);
+                    } catch (UnknownHostException e)
+                    {
+                        rack = "Unknown";
+                    }
+
+                    String status = liveNodes.contains(endpoint)
+                                    ? "Up"
+                                    : deadNodes.contains(endpoint)
+                                      ? "Down"
+                                      : "?";
+
+                    String state = "Normal";
+
+                    if (joiningNodes.contains(endpoint))
+                        state = "Joining";
+                    else if (leavingNodes.contains(endpoint))
+                        state = "Leaving";
+                    else if (movingNodes.contains(endpoint))
+                        state = "Moving";
+
+                    String load = loadMap.containsKey(endpoint)
+                                  ? loadMap.get(endpoint)
+                                  : "?";
+                    String owns = new DecimalFormat("##0.00%").format(entry.getValue());
+                    System.out.printf(format, endpoint, rack, status, state, load, owns, token);
+                }
+            }
+            System.out.println();
+        }
+    }
+
+    @Command(name = "netstats", description = "Print network information on provided host (connecting node by default)")
+    public static class NetStats extends NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            System.out.printf("Mode: %s%n", probe.getOperationMode());
+            Set<StreamState> statuses = probe.getStreamStatus();
+            if (statuses.isEmpty())
+                System.out.println("Not sending any streams.");
+            for (StreamState status : statuses)
+            {
+                System.out.printf("%s %s%n", status.description, status.planId.toString());
+                for (SessionInfo info : status.sessions)
+                {
+                    System.out.printf("    %s%n", info.peer.toString());
+                    if (!info.receivingSummaries.isEmpty())
+                    {
+                        System.out.printf("        Receiving %d files, %d bytes total%n", info.getTotalFilesToReceive(), info.getTotalSizeToReceive());
+                        for (ProgressInfo progress : info.getReceivingFiles())
+                        {
+                            System.out.printf("            %s%n", progress.toString());
+                        }
+                    }
+                    if (!info.sendingSummaries.isEmpty())
+                    {
+                        System.out.printf("        Sending %d files, %d bytes total%n", info.getTotalFilesToSend(), info.getTotalSizeToSend());
+                        for (ProgressInfo progress : info.getSendingFiles())
+                        {
+                            System.out.printf("            %s%n", progress.toString());
+                        }
+                    }
+                }
+            }
+
+            System.out.printf("Read Repair Statistics:%nAttempted: %d%nMismatch (Blocking): %d%nMismatch (Background): %d%n", probe.getReadRepairAttempted(), probe.getReadRepairRepairedBlocking(), probe.getReadRepairRepairedBackground());
+
+            MessagingServiceMBean ms = probe.msProxy;
+            System.out.printf("%-25s", "Pool Name");
+            System.out.printf("%10s", "Active");
+            System.out.printf("%10s", "Pending");
+            System.out.printf("%15s%n", "Completed");
+
+            int pending;
+            long completed;
+
+            pending = 0;
+            for (int n : ms.getCommandPendingTasks().values())
+                pending += n;
+            completed = 0;
+            for (long n : ms.getCommandCompletedTasks().values())
+                completed += n;
+            System.out.printf("%-25s%10s%10s%15s%n", "Commands", "n/a", pending, completed);
+
+            pending = 0;
+            for (int n : ms.getResponsePendingTasks().values())
+                pending += n;
+            completed = 0;
+            for (long n : ms.getResponseCompletedTasks().values())
+                completed += n;
+            System.out.printf("%-25s%10s%10s%15s%n", "Responses", "n/a", pending, completed);
+        }
+    }
+
+    @Command(name = "cfstats", description = "Print statistics on column families")
+    public static class CfStats extends NodeToolCmd
+    {
+        @Arguments(usage = "[<keyspace.cfname>...]", description = "List of column families (or keyspace) names")
+        private List<String> cfnames = new ArrayList<>();
+
+        @Option(name = "-i", description = "Ignore the list of column families and display the remaining cfs")
+        private boolean ignore = false;
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            OptionFilter filter = new OptionFilter(ignore, cfnames);
+            Map<String, List<ColumnFamilyStoreMBean>> cfstoreMap = new HashMap<>();
+
+            // get a list of column family stores
+            Iterator<Map.Entry<String, ColumnFamilyStoreMBean>> cfamilies = probe.getColumnFamilyStoreMBeanProxies();
+
+            while (cfamilies.hasNext())
+            {
+                Map.Entry<String, ColumnFamilyStoreMBean> entry = cfamilies.next();
+                String keyspaceName = entry.getKey();
+                ColumnFamilyStoreMBean cfsProxy = entry.getValue();
+
+                if (!cfstoreMap.containsKey(keyspaceName) && filter.isColumnFamilyIncluded(entry.getKey(), cfsProxy.getColumnFamilyName()))
+                {
+                    List<ColumnFamilyStoreMBean> columnFamilies = new ArrayList<>();
+                    columnFamilies.add(cfsProxy);
+                    cfstoreMap.put(keyspaceName, columnFamilies);
+                } else if (filter.isColumnFamilyIncluded(entry.getKey(), cfsProxy.getColumnFamilyName()))
+                {
+                    cfstoreMap.get(keyspaceName).add(cfsProxy);
+                }
+            }
+
+            // make sure all specified kss and cfs exist
+            filter.verifyKeyspaces(probe.getKeyspaces());
+            filter.verifyColumnFamilies();
+
+            // print out the table statistics
+            for (Map.Entry<String, List<ColumnFamilyStoreMBean>> entry : cfstoreMap.entrySet())
+            {
+                String keyspaceName = entry.getKey();
+                List<ColumnFamilyStoreMBean> columnFamilies = entry.getValue();
+                long keyspaceReadCount = 0;
+                long keyspaceWriteCount = 0;
+                int keyspacePendingTasks = 0;
+                double keyspaceTotalReadTime = 0.0f;
+                double keyspaceTotalWriteTime = 0.0f;
+
+                System.out.println("Keyspace: " + keyspaceName);
+                for (ColumnFamilyStoreMBean cfstore : columnFamilies)
+                {
+                    String cfName = cfstore.getColumnFamilyName();
+                    long writeCount = ((JmxReporter.TimerMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "WriteLatency")).getCount();
+                    long readCount = ((JmxReporter.TimerMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "ReadLatency")).getCount();
+
+                    if (readCount > 0)
+                    {
+                        keyspaceReadCount += readCount;
+                        keyspaceTotalReadTime += (long) probe.getColumnFamilyMetric(keyspaceName, cfName, "ReadTotalLatency");
+                    }
+                    if (writeCount > 0)
+                    {
+                        keyspaceWriteCount += writeCount;
+                        keyspaceTotalWriteTime += (long) probe.getColumnFamilyMetric(keyspaceName, cfName, "WriteTotalLatency");
+                    }
+                    keyspacePendingTasks += (int) probe.getColumnFamilyMetric(keyspaceName, cfName, "PendingTasks");
+                }
+
+                double keyspaceReadLatency = keyspaceReadCount > 0
+                                             ? keyspaceTotalReadTime / keyspaceReadCount / 1000
+                                             : Double.NaN;
+                double keyspaceWriteLatency = keyspaceWriteCount > 0
+                                              ? keyspaceTotalWriteTime / keyspaceWriteCount / 1000
+                                              : Double.NaN;
+
+                System.out.println("\tRead Count: " + keyspaceReadCount);
+                System.out.println("\tRead Latency: " + format("%s", keyspaceReadLatency) + " ms.");
+                System.out.println("\tWrite Count: " + keyspaceWriteCount);
+                System.out.println("\tWrite Latency: " + format("%s", keyspaceWriteLatency) + " ms.");
+                System.out.println("\tPending Tasks: " + keyspacePendingTasks);
+
+                // print out column family statistics for this keyspace
+                for (ColumnFamilyStoreMBean cfstore : columnFamilies)
+                {
+                    String cfName = cfstore.getColumnFamilyName();
+                    if (cfName.contains("."))
+                        System.out.println("\t\tTable (index): " + cfName);
+                    else
+                        System.out.println("\t\tTable: " + cfName);
+
+                    System.out.println("\t\tSSTable count: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "LiveSSTableCount"));
+
+                    int[] leveledSStables = cfstore.getSSTableCountPerLevel();
+                    if (leveledSStables != null)
+                    {
+                        System.out.print("\t\tSSTables in each level: [");
+                        for (int level = 0; level < leveledSStables.length; level++)
+                        {
+                            int count = leveledSStables[level];
+                            System.out.print(count);
+                            long maxCount = 4L; // for L0
+                            if (level > 0)
+                                maxCount = (long) Math.pow(10, level);
+                            //  show max threshold for level when exceeded
+                            if (count > maxCount)
+                                System.out.print("/" + maxCount);
+
+                            if (level < leveledSStables.length - 1)
+                                System.out.print(", ");
+                            else
+                                System.out.println("]");
+                        }
+                    }
+                    System.out.println("\t\tSpace used (live), bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "LiveDiskSpaceUsed"));
+                    System.out.println("\t\tSpace used (total), bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "TotalDiskSpaceUsed"));
+                    System.out.println("\t\tSpace used by snapshots (total), bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "SnapshotsSize"));
+                    System.out.println("\t\tSSTable Compression Ratio: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "CompressionRatio"));
+                    System.out.println("\t\tMemtable cell count: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "MemtableColumnsCount"));
+                    System.out.println("\t\tMemtable data size, bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "MemtableDataSize"));
+                    System.out.println("\t\tMemtable switch count: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "MemtableSwitchCount"));
+                    System.out.println("\t\tLocal read count: " + ((JmxReporter.TimerMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "ReadLatency")).getCount());
+                    double localReadLatency = ((JmxReporter.TimerMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "ReadLatency")).getMean() / 1000;
+                    double localRLatency = localReadLatency > 0 ? localReadLatency : Double.NaN;
+                    System.out.printf("\t\tLocal read latency: %01.3f ms%n", localRLatency);
+                    System.out.println("\t\tLocal write count: " + ((JmxReporter.TimerMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "WriteLatency")).getCount());
+                    double localWriteLatency = ((JmxReporter.TimerMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "WriteLatency")).getMean() / 1000;
+                    double localWLatency = localWriteLatency > 0 ? localWriteLatency : Double.NaN;
+                    System.out.printf("\t\tLocal write latency: %01.3f ms%n", localWLatency);
+                    System.out.println("\t\tPending tasks: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "PendingTasks"));
+                    System.out.println("\t\tBloom filter false positives: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "BloomFilterFalsePositives"));
+                    System.out.println("\t\tBloom filter false ratio: " + format("%01.5f", probe.getColumnFamilyMetric(keyspaceName, cfName, "RecentBloomFilterFalseRatio")));
+                    System.out.println("\t\tBloom filter space used, bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "BloomFilterDiskSpaceUsed"));
+                    System.out.println("\t\tCompacted partition minimum bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "MinRowSize"));
+                    System.out.println("\t\tCompacted partition maximum bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "MaxRowSize"));
+                    System.out.println("\t\tCompacted partition mean bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "MeanRowSize"));
+                    System.out.println("\t\tAverage live cells per slice (last five minutes): " + ((JmxReporter.HistogramMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "LiveScannedHistogram")).getMean());
+                    System.out.println("\t\tAverage tombstones per slice (last five minutes): " + ((JmxReporter.HistogramMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "TombstoneScannedHistogram")).getMean());
+
+                    System.out.println("");
+                }
+                System.out.println("----------------");
+            }
+        }
+
+        /**
+         * Used for filtering keyspaces and columnfamilies to be displayed using the cfstats command.
+         */
+        private static class OptionFilter
+        {
+            private Map<String, List<String>> filter = new HashMap<>();
+            private Map<String, List<String>> verifier = new HashMap<>();
+            private List<String> filterList = new ArrayList<>();
+            private boolean ignoreMode;
+
+            public OptionFilter(boolean ignoreMode, List<String> filterList)
+            {
+                this.filterList.addAll(filterList);
+                this.ignoreMode = ignoreMode;
+
+                for (String s : filterList)
+                {
+                    String[] keyValues = s.split("\\.", 2);
+
+                    // build the map that stores the ks' and cfs to use
+                    if (!filter.containsKey(keyValues[0]))
+                    {
+                        filter.put(keyValues[0], new ArrayList<String>());
+                        verifier.put(keyValues[0], new ArrayList<String>());
+
+                        if (keyValues.length == 2)
+                        {
+                            filter.get(keyValues[0]).add(keyValues[1]);
+                            verifier.get(keyValues[0]).add(keyValues[1]);
+                        }
+                    } else
+                    {
+                        if (keyValues.length == 2)
+                        {
+                            filter.get(keyValues[0]).add(keyValues[1]);
+                            verifier.get(keyValues[0]).add(keyValues[1]);
+                        }
+                    }
+                }
+            }
+
+            public boolean isColumnFamilyIncluded(String keyspace, String columnFamily)
+            {
+                // supplying empty params list is treated as wanting to display all kss & cfs
+                if (filterList.isEmpty())
+                    return !ignoreMode;
+
+                List<String> cfs = filter.get(keyspace);
+
+                // no such keyspace is in the map
+                if (cfs == null)
+                    return ignoreMode;
+                    // only a keyspace with no cfs was supplied
+                    // so ignore or include (based on the flag) every column family in specified keyspace
+                else if (cfs.size() == 0)
+                    return !ignoreMode;
+
+                // keyspace exists, and it contains specific cfs
+                verifier.get(keyspace).remove(columnFamily);
+                return ignoreMode ^ cfs.contains(columnFamily);
+            }
+
+            public void verifyKeyspaces(List<String> keyspaces)
+            {
+                for (String ks : verifier.keySet())
+                    if (!keyspaces.contains(ks))
+                        throw new IllegalArgumentException("Unknown keyspace: " + ks);
+            }
+
+            public void verifyColumnFamilies()
+            {
+                for (String ks : filter.keySet())
+                    if (verifier.get(ks).size() > 0)
+                        throw new IllegalArgumentException("Unknown column families: " + verifier.get(ks).toString() + " in keyspace: " + ks);
+            }
+        }
+    }
+
+    @Command(name = "cfhistograms", description = "Print statistic histograms for a given column family")
+    public static class CfHistograms extends NodeToolCmd
+    {
+        @Arguments(usage = "<keyspace> <cfname>", description = "The keyspace and column family name")
+        private List<String> args = new ArrayList<>();
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            checkArgument(args.size() == 2, "cfhistograms requires ks and cf args");
+
+            String keyspace = args.get(0);
+            String cfname = args.get(1);
+
+            // calculate percentile of row size and column count
+            long[] estimatedRowSize = (long[]) probe.getColumnFamilyMetric(keyspace, cfname, "EstimatedRowSizeHistogram");
+            long[] estimatedColumnCount = (long[]) probe.getColumnFamilyMetric(keyspace, cfname, "EstimatedColumnCountHistogram");
+
+            long[] bucketOffsets = new EstimatedHistogram().getBucketOffsets();
+            EstimatedHistogram rowSizeHist = new EstimatedHistogram(bucketOffsets, estimatedRowSize);
+            EstimatedHistogram columnCountHist = new EstimatedHistogram(bucketOffsets, estimatedColumnCount);
+
+            // build arrays to store percentile values
+            double[] estimatedRowSizePercentiles = new double[7];
+            double[] estimatedColumnCountPercentiles = new double[7];
+            double[] offsetPercentiles = new double[]{0.5, 0.75, 0.95, 0.98, 0.99};
+            for (int i = 0; i < offsetPercentiles.length; i++)
+            {
+                estimatedRowSizePercentiles[i] = rowSizeHist.percentile(offsetPercentiles[i]);
+                estimatedColumnCountPercentiles[i] = columnCountHist.percentile(offsetPercentiles[i]);
+            }
+
+            // min value
+            estimatedRowSizePercentiles[5] = rowSizeHist.min();
+            estimatedColumnCountPercentiles[5] = columnCountHist.min();
+            // max value
+            estimatedRowSizePercentiles[6] = rowSizeHist.max();
+            estimatedColumnCountPercentiles[6] = columnCountHist.max();
+
+            String[] percentiles = new String[]{"50%", "75%", "95%", "98%", "99%", "Min", "Max"};
+            double[] readLatency = probe.metricPercentilesAsArray((JmxReporter.HistogramMBean) probe.getColumnFamilyMetric(keyspace, cfname, "ReadLatency"));
+            double[] writeLatency = probe.metricPercentilesAsArray((JmxReporter.TimerMBean) probe.getColumnFamilyMetric(keyspace, cfname, "WriteLatency"));
+            double[] sstablesPerRead = probe.metricPercentilesAsArray((JmxReporter.HistogramMBean) probe.getColumnFamilyMetric(keyspace, cfname, "SSTablesPerReadHistogram"));
+
+            System.out.println(format("%s/%s histograms", keyspace, cfname));
+            System.out.println(format("%-10s%10s%18s%18s%18s%18s",
+                    "Percentile", "SSTables", "Write Latency", "Read Latency", "Partition Size", "Cell Count"));
+            System.out.println(format("%-10s%10s%18s%18s%18s%18s",
+                    "", "", "(micros)", "(micros)", "(bytes)", ""));
+
+            for (int i = 0; i < percentiles.length; i++)
+            {
+                System.out.println(format("%-10s%10.2f%18.2f%18.2f%18.0f%18.0f",
+                        percentiles[i],
+                        sstablesPerRead[i],
+                        writeLatency[i],
+                        readLatency[i],
+                        estimatedRowSizePercentiles[i],
+                        estimatedColumnCountPercentiles[i]));
+            }
+            System.out.println();
+        }
+    }
+
+    @Command(name = "cleanup", description = "Triggers the immediate cleanup of keys no longer belonging to a node. By default, clean all keyspaces")
+    public static class Cleanup extends NodeToolCmd
+    {
+        @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
+        private List<String> args = new ArrayList<>();
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            List<String> keyspaces = parseOptionalKeyspace(args, probe);
+            String[] cfnames = parseOptionalColumnFamilies(args);
+
+            for (String keyspace : keyspaces)
+            {
+                if (Keyspace.SYSTEM_KS.equals(keyspace))
+                    continue;
+
+                try
+                {
+                    probe.forceKeyspaceCleanup(keyspace, cfnames);
+                } catch (Exception e)
+                {
+                    throw new RuntimeException("Error occurred during cleanup", e);
+                }
+            }
+        }
+    }
+
+    @Command(name = "clearsnapshot", description = "Remove the snapshot with the given name from the given keyspaces. If no snapshotName is specified we will remove all snapshots")
+    public static class ClearSnapshot extends NodeToolCmd
+    {
+        @Arguments(usage = "[<keyspaces>...] ", description = "Remove snapshots from the given keyspaces")
+        private List<String> keyspaces = new ArrayList<>();
+
+        @Option(title = "snapshot_name", name = "-t", description = "Remove the snapshot with a given name")
+        private String snapshotName = EMPTY;
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            StringBuilder sb = new StringBuilder();
+
+            sb.append("Requested clearing snapshot(s) for ");
+
+            if (keyspaces.isEmpty())
+                sb.append("[all keyspaces]");
+            else
+                sb.append("[").append(join(keyspaces, ", ")).append("]");
+
+            if (!snapshotName.isEmpty())
+                sb.append(" with snapshot name [").append(snapshotName).append("]");
+
+            System.out.println(sb.toString());
+
+            try
+            {
+                probe.clearSnapshot(snapshotName, toArray(keyspaces, String.class));
+            } catch (IOException e)
+            {
+                throw new RuntimeException("Error during clearing snapshots", e);
+            }
+        }
+    }
+
+    @Command(name = "compact", description = "Force a (major) compaction on one or more column families")
+    public static class Compact extends NodeToolCmd
+    {
+        @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
+        private List<String> args = new ArrayList<>();
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            List<String> keyspaces = parseOptionalKeyspace(args, probe);
+            String[] cfnames = parseOptionalColumnFamilies(args);
+
+            for (String keyspace : keyspaces)
+            {
+                try
+                {
+                    probe.forceKeyspaceCompaction(keyspace, cfnames);
+                } catch (Exception e)
+                {
+                    throw new RuntimeException("Error occurred during compaction", e);
+                }
+            }
+        }
+    }
+
+    @Command(name = "flush", description = "Flush one or more column families")
+    public static class Flush extends NodeToolCmd
+    {
+        @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
+        private List<String> args = new ArrayList<>();
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            List<String> keyspaces = parseOptionalKeyspace(args, probe);
+            String[] cfnames = parseOptionalColumnFamilies(args);
+
+            for (String keyspace : keyspaces)
+            {
+                try
+                {
+                    probe.forceKeyspaceFlush(keyspace, cfnames);
+                } catch (Exception e)
+                {
+                    throw new RuntimeException("Error occurred during flushing", e);
+                }
+            }
+        }
+    }
+
+    @Command(name = "scrub", description = "Scrub (rebuild sstables for) one or more column families")
+    public static class Scrub extends NodeToolCmd
+    {
+        @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
+        private List<String> args = new ArrayList<>();
+
+        @Option(title = "disable_snapshot", name = {"-ns", "--no-snapshot"}, description = "Scrubbed CFs will be snapshotted first, if disableSnapshot is false. (default false)")
+        private boolean disableSnapshot = false;
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            List<String> keyspaces = parseOptionalKeyspace(args, probe);
+            String[] cfnames = parseOptionalColumnFamilies(args);
+
+            for (String keyspace : keyspaces)
+            {
+                try
+                {
+                    probe.scrub(disableSnapshot, keyspace, cfnames);
+                } catch (Exception e)
+                {
+                    throw new RuntimeException("Error occurred during flushing", e);
+                }
+            }
+        }
+    }
+
+    @Command(name = "disableautocompaction", description = "Disable autocompaction for the given keyspace and column family")
+    public static class DisableAutoCompaction extends NodeToolCmd
+    {
+        @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
+        private List<String> args = new ArrayList<>();
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            List<String> keyspaces = parseOptionalKeyspace(args, probe);
+            String[] cfnames = parseOptionalColumnFamilies(args);
+
+            for (String keyspace : keyspaces)
+            {
+                try
+                {
+                    probe.disableAutoCompaction(keyspace, cfnames);
+                } catch (IOException e)
+                {
+                    throw new RuntimeException("Error occurred during disabling auto-compaction", e);
+                }
+            }
+        }
+    }
+
+    @Command(name = "enableautocompaction", description = "Enable autocompaction for the given keyspace and column family")
+    public static class EnableAutoCompaction extends NodeToolCmd
+    {
+        @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
+        private List<String> args = new ArrayList<>();
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            List<String> keyspaces = parseOptionalKeyspace(args, probe);
+            String[] cfnames = parseOptionalColumnFamilies(args);
+
+            for (String keyspace : keyspaces)
+            {
+                try
+                {
+                    probe.enableAutoCompaction(keyspace, cfnames);
+                } catch (IOException e)
+                {
+                    throw new RuntimeException("Error occurred during enabling auto-compaction", e);
+                }
+            }
+        }
+    }
+
+    @Command(name = "upgradesstables", description = "Rewrite sstables (for the requested column families) that are not on the current version (thus upgrading them to said current version)")
+    public static class UpgradeSSTable extends NodeToolCmd
+    {
+        @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
+        private List<String> args = new ArrayList<>();
+
+        @Option(title = "include_all", name = {"-a", "--include-all-sstables"}, description = "Use -a to include all sstables, even those already on the current version")
+        private boolean includeAll = false;
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            List<String> keyspaces = parseOptionalKeyspace(args, probe);
+            String[] cfnames = parseOptionalColumnFamilies(args);
+
+            for (String keyspace : keyspaces)
+            {
+                try
+                {
+                    probe.upgradeSSTables(keyspace, !includeAll, cfnames);
+                } catch (Exception e)
+                {
+                    throw new RuntimeException("Error occurred during enabling auto-compaction", e);
+                }
+            }
+        }
+    }
+
+    @Command(name = "compactionstats", description = "Print statistics on compactions")
+    public static class CompactionStats extends NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            int compactionThroughput = probe.getCompactionThroughput();
+            CompactionManagerMBean cm = probe.getCompactionManagerProxy();
+            System.out.println("pending tasks: " + probe.getCompactionMetric("PendingTasks"));
+            if (cm.getCompactions().size() > 0)
+                System.out.printf("%25s%16s%16s%16s%16s%10s%10s%n", "compaction type", "keyspace", "table", "completed", "total", "unit", "progress");
+            long remainingBytes = 0;
+            for (Map<String, String> c : cm.getCompactions())
+            {
+                String percentComplete = new Long(c.get("total")) == 0
+                                         ? "n/a"
+                                         : new DecimalFormat("0.00").format((double) new Long(c.get("completed")) / new Long(c.get("total")) * 100) + "%";
+                System.out.printf("%25s%16s%16s%16s%16s%10s%10s%n", c.get("taskType"), c.get("keyspace"), c.get("columnfamily"), c.get("completed"), c.get("total"), c.get("unit"), percentComplete);
+                if (c.get("taskType").equals(OperationType.COMPACTION.toString()))
+                    remainingBytes += (new Long(c.get("total")) - new Long(c.get("completed")));
+            }
+            long remainingTimeInSecs = compactionThroughput == 0 || remainingBytes == 0
+                                       ? -1
+                                       : (remainingBytes) / (1024L * 1024L * compactionThroughput);
+            String remainingTime = remainingTimeInSecs < 0
+                                   ? "n/a"
+                                   : format("%dh%02dm%02ds", remainingTimeInSecs / 3600, (remainingTimeInSecs % 3600) / 60, (remainingTimeInSecs % 60));
+
+            System.out.printf("%25s%10s%n", "Active compaction remaining time : ", remainingTime);
+        }
+    }
+
+    @Command(name = "compactionhistory", description = "Print history of compaction")
+    public static class CompactionHistory extends NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            System.out.println("Compaction History: ");
+
+            TabularData tabularData = probe.getCompactionHistory();
+            if (tabularData.isEmpty())
+            {
+                System.out.printf("There is no compaction history");
+                return;
+            }
+
+            String format = "%-41s%-19s%-29s%-26s%-15s%-15s%s%n";
+            List<String> indexNames = tabularData.getTabularType().getIndexNames();
+            System.out.printf(format, toArray(indexNames, String.class));
+
+            Set<?> values = tabularData.keySet();
+            for (Object eachValue : values)
+            {
+                List<?> value = (List<?>) eachValue;
+                System.out.printf(format, toArray(value, Object.class));
+            }
+        }
+    }
+
+    @Command(name = "decommission", description = "Decommission the *node I am connecting to*")
+    public static class Decommission extends NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            try
+            {
+                probe.decommission();
+            } catch (InterruptedException e)
+            {
+                throw new RuntimeException("Error decommissioning node", e);
+            }
+        }
+    }
+
+    @Command(name = "describecluster", description = "Print the name, snitch, partitioner and schema version of a cluster")
+    public static class DescribeCluster extends NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            // display cluster name, snitch and partitioner
+            System.out.println("Cluster Information:");
+            System.out.println("\tName: " + probe.getClusterName());
+            System.out.println("\tSnitch: " + probe.getEndpointSnitchInfoProxy().getSnitchName());
+            System.out.println("\tPartitioner: " + probe.getPartitioner());
+
+            // display schema version for each node
+            System.out.println("\tSchema versions:");
+            Map<String, List<String>> schemaVersions = probe.getSpProxy().getSchemaVersions();
+            for (String version : schemaVersions.keySet())
+            {
+                System.out.println(format("\t\t%s: %s%n", version, schemaVersions.get(version)));
+            }
+        }
+    }
+
+    @Command(name = "disablebinary", description = "Disable native transport (binary protocol)")
+    public static class DisableBinary extends NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            probe.stopNativeTransport();
+        }
+    }
+
+    @Command(name = "enablebinary", description = "Reenable native transport (binary protocol)")
+    public static class EnableBinary extends NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            probe.startNativeTransport();
+        }
+    }
+
+    @Command(name = "enablegossip", description = "Reenable gossip")
+    public static class EnableGossip extends NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            probe.startGossiping();
+        }
+    }
+
+    @Command(name = "disablegossip", description = "Disable gossip (effectively marking the node down)")
+    public static class DisableGossip extends NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            probe.stopGossiping();
+        }
+    }
+
+    @Command(name = "enablehandoff", description = "Reenable the future hints storing on the current node")
+    public static class EnableHandoff extends NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            probe.enableHintedHandoff();
+        }
+    }
+
+    @Command(name = "enablethrift", description = "Reenable thrift server")
+    public static class EnableThrift extends NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            probe.startThriftServer();
+        }
+    }
+
+    @Command(name = "getcompactionthreshold", description = "Print min and max compaction thresholds for a given column family")
+    public static class GetCompactionThreshold extends NodeToolCmd
+    {
+        @Arguments(usage = "<keyspace> <cfname>", description = "The keyspace with a column family")
+        private List<String> args = new ArrayList<>();
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            checkArgument(args.size() == 2, "getcompactionthreshold requires ks and cf args");
+            String ks = args.get(0);
+            String cf = args.get(1);
+
+            ColumnFamilyStoreMBean cfsProxy = probe.getCfsProxy(ks, cf);
+            System.out.println("Current compaction thresholds for " + ks + "/" + cf + ": \n" +
+                    " min = " + cfsProxy.getMinimumCompactionThreshold() + ", " +
+                    " max = " + cfsProxy.getMaximumCompactionThreshold());
+        }
+    }
+
+    @Command(name = "getcompactionthroughput", description = "Print the MB/s throughput cap for compaction in the system")
+    public static class GetCompactionThroughput extends NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            System.out.println("Current compaction throughput: " + probe.getCompactionThroughput() + " MB/s");
+        }
+    }
+
+    @Command(name = "getstreamthroughput", description = "Print the MB/s throughput cap for streaming in the system")
+    public static class GetStreamThroughput extends NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            System.out.println("Current stream throughput: " + probe.getStreamThroughput() + " MB/s");
+        }
+    }
+
+    @Command(name = "getendpoints", description = "Print the end points that owns the key")
+    public static class GetEndpoints extends NodeToolCmd
+    {
+        @Arguments(usage = "<keyspace> <cfname> <key>", description = "The keyspace, the column family, and the key for which we need to find the endpoint")
+        private List<String> args = new ArrayList<>();
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            checkArgument(args.size() == 3, "getendpoints requires ks, cf and key args");
+            String ks = args.get(0);
+            String cf = args.get(1);
+            String key = args.get(2);
+
+            List<InetAddress> endpoints = probe.getEndpoints(ks, cf, key);
+            for (InetAddress endpoint : endpoints)
+            {
+                System.out.println(endpoint.getHostAddress());
+            }
+        }
+    }
+
+    @Command(name = "getsstables", description = "Print the sstable filenames that own the key")
+    public static class GetSSTables extends NodeToolCmd
+    {
+        @Arguments(usage = "<keyspace> <cfname> <key>", description = "The keyspace, the column family, and the key")
+        private List<String> args = new ArrayList<>();
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            checkArgument(args.size() == 3, "getsstables requires ks, cf and key args");
+            String ks = args.get(0);
+            String cf = args.get(1);
+            String key = args.get(2);
+
+            List<String> sstables = probe.getSSTables(ks, cf, key);
+            for (String sstable : sstables)
+            {
+                System.out.println(sstable);
+            }
+        }
+    }
+
+    @Command(name = "gossipinfo", description = "Shows the gossip information for the cluster")
+    public static class GossipInfo extends NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            System.out.println(probe.getGossipInfo());
+        }
+    }
+
+    @Command(name = "invalidatekeycache", description = "Invalidate the key cache")
+    public static class InvalidateKeyCache extends NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            probe.invalidateKeyCache();
+        }
+    }
+
+    @Command(name = "invalidaterowcache", description = "Invalidate the row cache")
+    public static class InvalidateRowCache extends NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            probe.invalidateRowCache();
+        }
+    }
+
+    @Command(name = "join", description = "Join the ring")
+    public static class Join extends NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            checkState(!probe.isJoined(), "This node has already joined the ring.");
+
+            try
+            {
+                probe.joinRing();
+            } catch (IOException e)
+            {
+                throw new RuntimeException("Error during joining the ring", e);
+            }
+        }
+    }
+
+    @Command(name = "move", description = "Move node on the token ring to a new token")
+    public static class Move extends NodeToolCmd
+    {
+        @Arguments(usage = "<new token>", description = "The new token. (for negative tokens)", required = true)
+        private String newToken = EMPTY;
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            try
+            {
+                probe.move(newToken);
+            } catch (IOException e)
+            {
+                throw new RuntimeException("Error during moving node", e);
+            }
+        }
+    }
+
+    @Command(name = "pausehandoff", description = "Pause hints delivery process")
+    public static class PauseHandoff extends NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            probe.pauseHintsDelivery();
+        }
+    }
+
+    @Command(name = "resumehandoff", description = "Resume hints delivery process")
+    public static class ResumeHandoff extends NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            probe.resumeHintsDelivery();
+        }
+    }
+
+
+    @Command(name = "proxyhistograms", description = "Print statistic histograms for network operations")
+    public static class ProxyHistograms extends NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            String[] percentiles = new String[]{"50%", "75%", "95%", "98%", "99%", "Min", "Max"};
+            double[] readLatency = probe.metricPercentilesAsArray(probe.getProxyMetric("Read"));
+            double[] writeLatency = probe.metricPercentilesAsArray(probe.getProxyMetric("Write"));
+            double[] rangeLatency = probe.metricPercentilesAsArray(probe.getProxyMetric("RangeSlice"));
+
+            System.out.println("proxy histograms");
+            System.out.println(format("%-10s%18s%18s%18s",
+                    "Percentile", "Read Latency", "Write Latency", "Range Latency"));
+            System.out.println(format("%-10s%18s%18s%18s",
+                    "", "(micros)", "(micros)", "(micros)"));
+            for (int i = 0; i < percentiles.length; i++)
+            {
+                System.out.println(format("%-10s%18.2f%18.2f%18.2f",
+                        percentiles[i],
+                        readLatency[i],
+                        writeLatency[i],
+                        rangeLatency[i]));
+            }
+            System.out.println();
+        }
+    }
+
+    @Command(name = "rebuild", description = "Rebuild data by streaming from other nodes (similarly to bootstrap)")
+    public static class Rebuild extends NodeToolCmd
+    {
+        @Arguments(usage = "<src-dc-name>", description = "Name of DC from which to select sources for streaming. By default, pick any DC")
+        private String sourceDataCenterName = null;
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            probe.rebuild(sourceDataCenterName);
+        }
+    }
+
+    @Command(name = "refresh", description = "Load newly placed SSTables to the system without restart")
+    public static class Refresh extends NodeToolCmd
+    {
+        @Arguments(usage = "<keyspace> <cfname>", description = "The keyspace and column family name")
+        private List<String> args = new ArrayList<>();
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            checkArgument(args.size() == 2, "refresh requires ks and cf args");
+            probe.loadNewSSTables(args.get(0), args.get(1));
+        }
+    }
+
+    @Deprecated
+    @Command(name = "removetoken", description = "DEPRECATED (see removenode)", hidden = true)
+    public static class RemoveToken extends NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            System.err.println("Warn: removetoken is deprecated, please use removenode instead");
+        }
+    }
+
+    @Command(name = "removenode", description = "Show status of current node removal, force completion of pending removal or remove provided ID")
+    public static class RemoveNode extends NodeToolCmd
+    {
+        @Arguments(title = "remove_operation", usage = "<status>|<force>|<ID>", description = "The keyspace and column family name", required = true)
+        private String removeOperation = EMPTY;
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            switch (removeOperation)
+            {
+                case "status":
+                    System.out.println("RemovalStatus: " + probe.getRemovalStatus());
+                    break;
+                case "force":
+                    System.out.println("RemovalStatus: " + probe.getRemovalStatus());
+                    probe.forceRemoveCompletion();
+                    break;
+                default:
+                    probe.removeNode(removeOperation);
+                    break;
+            }
+        }
+    }
+
+    @Command(name = "repair", description = "Repair one or more column families")
+    public static class Repair extends NodeToolCmd
+    {
+        @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
+        private List<String> args = new ArrayList<>();
+
+        @Option(title = "parallel", name = {"-par", "--parallel"}, description = "Use -par to carry out a parallel repair")
+        private boolean parallel = false;
+
+        @Option(title = "local_dc", name = {"-local", "--in-local-dc"}, description = "Use -local to only repair against nodes in the same datacenter")
+        private boolean localDC = false;
+
+        @Option(title = "specific_dc", name = {"-dc", "--in-dc"}, description = "Use -dc to repair specific datacenters")
+        private List<String> specificDataCenters = new ArrayList<>();
+
+        @Option(title = "start_token", name = {"-st", "--start-token"}, description = "Use -st to specify a token at which the repair range starts")
+        private String startToken = EMPTY;
+
+        @Option(title = "end_token", name = {"-et", "--end-token"}, description = "Use -et to specify a token at which repair range ends")
+        private String endToken = EMPTY;
+
+        @Option(title = "primary_range", name = {"-pr", "--partitioner-range"}, description = "Use -pr to repair only the first range returned by the partitioner")
+        private boolean primaryRange = false;
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            List<String> keyspaces = parseOptionalKeyspace(args, probe);
+            String[] cfnames = parseOptionalColumnFamilies(args);
+
+            for (String keyspace : keyspaces)
+            {
+                try
+                {
+                    Collection<String> dataCenters = null;
+                    if (!specificDataCenters.isEmpty())
+                        dataCenters = newArrayList(specificDataCenters);
+                    else if (localDC)
+                        dataCenters = newArrayList(probe.getDataCenter());
+
+                    if (!startToken.isEmpty() || !endToken.isEmpty())
+                        probe.forceRepairRangeAsync(System.out, keyspace, !parallel, dataCenters, startToken, endToken);
+                    else
+                        probe.forceRepairAsync(System.out, keyspace, !parallel, dataCenters, primaryRange, cfnames);
+                } catch (Exception e)
+                {
+                    throw new RuntimeException("Error occurred during repair", e);
+                }
+            }
+        }
+    }
+
+    @Command(name = "setcachecapacity", description = "Set global key and row cache capacities (in MB units)")
+    public static class SetCacheCapacity extends NodeToolCmd
+    {
+        @Arguments(title = "<key-cache-capacity> <row-cache-capacity>", usage = "<key-cache-capacity> <row-cache-capacity>", description = "Key cache and row cache (in MB)", required = true)
+        private List<Integer> args = new ArrayList<>();
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            checkArgument(args.size() == 2, "setcachecapacity requires key-cache-capacity, and row-cache-capacity args.");
+            probe.setCacheCapacities(args.get(0), args.get(1));
+        }
+    }
+
+    @Command(name = "setcompactionthreshold", description = "Set min and max compaction thresholds for a given column family")
+    public static class SetCompactionThreshold extends NodeToolCmd
+    {
+        @Arguments(title = "<keyspace> <cfname> <minthreshold> <maxthreshold>", usage = "<keyspace> <cfname> <minthreshold> <maxthreshold>", description = "The keyspace, the column family, min and max threshold", required = true)
+        private List<String> args = new ArrayList<>();
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            checkArgument(args.size() == 4, "setcompactionthreshold requires ks, cf, min, and max threshold args.");
+
+            int minthreshold = parseInt(args.get(2));
+            int maxthreshold = parseInt(args.get(3));
+            checkArgument(minthreshold >= 0 && maxthreshold >= 0, "Thresholds must be positive integers");
+            checkArgument(minthreshold <= maxthreshold, "Min threshold cannot be greater than max.");
+            checkArgument(minthreshold >= 2 || maxthreshold == 0, "Min threshold must be at least 2");
+
+            probe.setCompactionThreshold(args.get(0), args.get(1), minthreshold, maxthreshold);
+        }
+    }
+
+    @Command(name = "setcompactionthroughput", description = "Set the MB/s throughput cap for compaction in the system, or 0 to disable throttling")
+    public static class SetCompactionThroughput extends NodeToolCmd
+    {
+        @Arguments(title = "compaction_throughput", usage = "<value_in_mb>", description = "Value in MB, 0 to disable throttling", required = true)
+        private Integer compactionThroughput = null;
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            probe.setCompactionThroughput(compactionThroughput);
+        }
+    }
+
+    @Command(name = "setstreamthroughput", description = "Set the MB/s throughput cap for streaming in the system, or 0 to disable throttling")
+    public static class SetStreamThroughput extends NodeToolCmd
+    {
+        @Arguments(title = "stream_throughput", usage = "<value_in_mb>", description = "Value in MB, 0 to disable throttling", required = true)
+        private Integer streamThroughput = null;
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            probe.setStreamThroughput(streamThroughput);
+        }
+    }
+
+    @Command(name = "settraceprobability", description = "Sets the probability for tracing any given request to value. 0 disables, 1 enables for all requests, 0 is the default")
+    public static class SetTraceProbability extends NodeToolCmd
+    {
+        @Arguments(title = "trace_probability", usage = "<value>", description = "Trace probability between 0 and 1 (ex: 0.2)", required = true)
+        private Double traceProbability = null;
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            checkArgument(traceProbability >= 0 && traceProbability <= 1, "Trace probability must be between 0 and 1");
+            probe.setTraceProbability(traceProbability);
+        }
+    }
+
+    @Command(name = "snapshot", description = "Take a snapshot of specified keyspaces or a snapshot of the specified column family")
+    public static class Snapshot extends NodeToolCmd
+    {
+        @Arguments(usage = "[<keyspaces...>]", description = "List of keyspaces. By default, all keyspaces")
+        private List<String> keyspaces = new ArrayList<>();
+
+        @Option(title = "cfname", name = {"-cf", "--column-family"}, description = "The column family name (you must specify one and only one keyspace for using this option)")
+        private String columnFamily = null;
+
+        @Option(title = "tag", name = {"-t", "--tag"}, description = "The name of the snapshot")
+        private String snapshotName = Long.toString(System.currentTimeMillis());
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            try
+            {
+                StringBuilder sb = new StringBuilder();
+
+                sb.append("Requested creating snapshot(s) for ");
+
+                if (keyspaces.isEmpty())
+                    sb.append("[all keyspaces]");
+                else
+                    sb.append("[").append(join(keyspaces, ", ")).append("]");
+
+                if (!snapshotName.isEmpty())
+                    sb.append(" with snapshot name [").append(snapshotName).append("]");
+
+                System.out.println(sb.toString());
+
+                probe.takeSnapshot(snapshotName, columnFamily, toArray(keyspaces, String.class));
+                System.out.println("Snapshot directory: " + snapshotName);
+            } catch (IOException e)
+            {
+                throw new RuntimeException("Error during taking a snapshot", e);
+            }
+        }
+    }
+
+    @Command(name = "status", description = "Print cluster information (state, load, IDs, ...)")
+    public static class Status extends NodeToolCmd
+    {
+        @Arguments(usage = "[<keyspace>]", description = "The keyspace name")
+        private String keyspace = null;
+
+        @Option(title = "resolve_ip", name = {"-r", "--resolve-ip"}, description = "Show node domain names instead of IPs")
+        private boolean resolveIp = false;
+
+        private boolean hasEffectiveOwns = false;
+        private boolean isTokenPerNode = true;
+        private int maxAddressLength = 0;
+        private String format = null;
+        private Collection<String> joiningNodes, leavingNodes, movingNodes, liveNodes, unreachableNodes;
+        private Map<String, String> loadMap, hostIDMap, tokensToEndpoints;
+        private EndpointSnitchInfoMBean epSnitchInfo;
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            joiningNodes = probe.getJoiningNodes();
+            leavingNodes = probe.getLeavingNodes();
+            movingNodes = probe.getMovingNodes();
+            loadMap = probe.getLoadMap();
+            tokensToEndpoints = probe.getTokenToEndpointMap();
+            liveNodes = probe.getLiveNodes();
+            unreachableNodes = probe.getUnreachableNodes();
+            hostIDMap = probe.getHostIdMap();
+            epSnitchInfo = probe.getEndpointSnitchInfoProxy();
+
+            SetHostStat ownerships;
+            try
+            {
+                ownerships = new SetHostStat(probe.effectiveOwnership(keyspace));
+                hasEffectiveOwns = true;
+            } catch (IllegalStateException e)
+            {
+                ownerships = new SetHostStat(probe.getOwnership());
+            }
+
+            // More tokens then nodes (aka vnodes)?
+            if (new HashSet<>(tokensToEndpoints.values()).size() < tokensToEndpoints.keySet().size())
+                isTokenPerNode = false;
+
+            Map<String, SetHostStat> dcs = getOwnershipByDc(probe, ownerships);
+
+            findMaxAddressLength(dcs);
+
+            // Datacenters
+            for (Map.Entry<String, SetHostStat> dc : dcs.entrySet())
+            {
+                String dcHeader = String.format("Datacenter: %s%n", dc.getKey());
+                System.out.printf(dcHeader);
+                for (int i = 0; i < (dcHeader.length() - 1); i++) System.out.print('=');
+                System.out.println();
+
+                // Legend
+                System.out.println("Status=Up/Down");
+                System.out.println("|/ State=Normal/Leaving/Joining/Moving");
+
+                printNodesHeader(hasEffectiveOwns, isTokenPerNode);
+
+                // Nodes
+                for (HostStat entry : dc.getValue())
+                    printNode(probe, entry, hasEffectiveOwns, isTokenPerNode);
+            }
+
+        }
+
+        private void findMaxAddressLength(Map<String, SetHostStat> dcs)
+        {
+            maxAddressLength = 0;
+            for (Map.Entry<String, SetHostStat> dc : dcs.entrySet())
+            {
+                for (HostStat stat : dc.getValue())
+                {
+                    maxAddressLength = Math.max(maxAddressLength, stat.ipOrDns().length());
+                }
+            }
+        }
+
+        private void printNodesHeader(boolean hasEffectiveOwns, boolean isTokenPerNode)
+        {
+            String fmt = getFormat(hasEffectiveOwns, isTokenPerNode);
+            String owns = hasEffectiveOwns ? "Owns (effective)" : "Owns";
+
+            if (isTokenPerNode)
+                System.out.printf(fmt, "-", "-", "Address", "Load", owns, "Host ID", "Token", "Rack");
+            else
+                System.out.printf(fmt, "-", "-", "Address", "Load", "Tokens", owns, "Host ID", "Rack");
+        }
+
+        private void printNode(NodeProbe probe, HostStat hostStat, boolean hasEffectiveOwns, boolean isTokenPerNode)
+        {
+            String status, state, load, strOwns, hostID, rack, fmt;
+            fmt = getFormat(hasEffectiveOwns, isTokenPerNode);
+            String endpoint = hostStat.ip;
+            if (liveNodes.contains(endpoint)) status = "U";
+            else if (unreachableNodes.contains(endpoint)) status = "D";
+            else status = "?";
+            if (joiningNodes.contains(endpoint)) state = "J";
+            else if (leavingNodes.contains(endpoint)) state = "L";
+            else if (movingNodes.contains(endpoint)) state = "M";
+            else state = "N";
+
+            load = loadMap.containsKey(endpoint) ? loadMap.get(endpoint) : "?";
+            strOwns = new DecimalFormat("##0.0%").format(hostStat.owns);
+            hostID = hostIDMap.get(endpoint);
+
+            try
+            {
+                rack = epSnitchInfo.getRack(endpoint);
+            } catch (UnknownHostException e)
+            {
+                throw new RuntimeException(e);
+            }
+
+            if (isTokenPerNode)
+            {
+                System.out.printf(fmt, status, state, hostStat.ipOrDns(), load, strOwns, hostID, probe.getTokens(endpoint).get(0), rack);
+            } else
+            {
+                int tokens = probe.getTokens(endpoint).size();
+                System.out.printf(fmt, status, state, hostStat.ipOrDns(), load, tokens, strOwns, hostID, rack);
+            }
+        }
+
+        private String getFormat(
+                boolean hasEffectiveOwns,
+                boolean isTokenPerNode)
+        {
+            if (format == null)
+            {
+                StringBuilder buf = new StringBuilder();
+                String addressPlaceholder = String.format("%%-%ds  ", maxAddressLength);
+                buf.append("%s%s  ");                         // status
+                buf.append(addressPlaceholder);               // address
+                buf.append("%-9s  ");                         // load
+                if (!isTokenPerNode)
+                    buf.append("%-6s  ");                     // "Tokens"
+                if (hasEffectiveOwns)
+                    buf.append("%-16s  ");                    // "Owns (effective)"
+                else
+                    buf.append("%-6s  ");                     // "Owns
+                buf.append("%-36s  ");                        // Host ID
+                if (isTokenPerNode)
+                    buf.append("%-39s  ");                    // token
+                buf.append("%s%n");                           // "Rack"
+
+                format = buf.toString();
+            }
+
+            return format;
+        }
+
+        private Map<String, SetHostStat> getOwnershipByDc(NodeProbe probe, SetHostStat ownerships)
+        {
+            Map<String, SetHostStat> ownershipByDc = Maps.newLinkedHashMap();
+            EndpointSnitchInfoMBean epSnitchInfo = probe.getEndpointSnitchInfoProxy();
+
+            try
+            {
+                for (HostStat ownership : ownerships)
+                {
+                    String dc = epSnitchInfo.getDatacenter(ownership.ip);
+                    if (!ownershipByDc.containsKey(dc))
+                        ownershipByDc.put(dc, new SetHostStat());
+                    ownershipByDc.get(dc).add(ownership);
+                }
+            } catch (UnknownHostException e)
+            {
+                throw new RuntimeException(e);
+            }
+
+            return ownershipByDc;
+        }
+
+        class SetHostStat implements Iterable<HostStat>
+        {
+            final List<HostStat> hostStats = new ArrayList<>();
+
+            public SetHostStat()
+            {
+            }
+
+            public SetHostStat(Map<InetAddress, Float> ownerships)
+            {
+                for (Map.Entry<InetAddress, Float> entry : ownerships.entrySet())
+                {
+                    hostStats.add(new HostStat(entry));
+                }
+            }
+
+            @Override
+            public Iterator<HostStat> iterator()
+            {
+                return hostStats.iterator();
+            }
+
+            public void add(HostStat entry)
+            {
+                hostStats.add(entry);
+            }
+        }
+
+        class HostStat
+        {
+            public final String ip;
+            public final String dns;
+            public final Float owns;
+
+            public HostStat(Map.Entry<InetAddress, Float> ownership)
+            {
+                this.ip = ownership.getKey().getHostAddress();
+                this.dns = ownership.getKey().getHostName();
+                this.owns = ownership.getValue();
+            }
+
+            public String ipOrDns()
+            {
+                if (resolveIp)
+                {
+                    return dns;
+                }
+                return ip;
+            }
+        }
+    }
+
+    @Command(name = "statusbinary", description = "Status of native transport (binary protocol)")
+    public static class StatusBinary extends NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            System.out.println(
+                    probe.isNativeTransportRunning()
+                    ? "running"
+                    : "not running");
+        }
+    }
+
+    @Command(name = "statusthrift", description = "Status of thrift server")
+    public static class StatusThrift extends NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            System.out.println(
+                    probe.isThriftServerRunning()
+                    ? "running"
+                    : "not running");
+        }
+    }
+
+    @Command(name = "stop", description = "Stop compaction")
+    public static class Stop extends NodeToolCmd
+    {
+        @Arguments(title = "compaction_type", usage = "<compaction type>", description = "Supported types are COMPACTION, VALIDATION, CLEANUP, SCRUB, INDEX_BUILD", required = true)
+        private OperationType compactionType = OperationType.UNKNOWN;
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            probe.stop(compactionType.name());
+        }
+    }
+
+    @Command(name = "stopdaemon", description = "Stop cassandra daemon")
+    public static class StopDaemon extends NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            try
+            {
+                probe.stopCassandraDaemon();
+            } catch (Exception ignored)
+            {
+                // ignored
+            }
+            System.out.println("Cassandra has shutdown.");
+        }
+    }
+
+    @Command(name = "version", description = "Print cassandra version")
+    public static class Version extends NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            System.out.println("ReleaseVersion: " + probe.getReleaseVersion());
+        }
+    }
+
+    @Command(name = "describering", description = "Shows the token ranges info of a given keyspace")
+    public static class DescribeRing extends NodeToolCmd
+    {
+        @Arguments(description = "The keyspace name", required = true)
+        String keyspace = EMPTY;
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            System.out.println("Schema Version:" + probe.getSchemaVersion());
+            System.out.println("TokenRange: ");
+            try
+            {
+                for (String tokenRangeString : probe.describeRing(keyspace))
+                {
+                    System.out.println("\t" + tokenRangeString);
+                }
+            } catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @Command(name = "rangekeysample", description = "Shows the sampled keys held across all keyspaces")
+    public static class RangeKeySample extends NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            System.out.println("RangeKeySample: ");
+            List<String> tokenStrings = probe.sampleKeyRange();
+            for (String tokenString : tokenStrings)
+            {
+                System.out.println("\t" + tokenString);
+            }
+        }
+    }
+
+    @Command(name = "rebuild_index", description = "A full rebuilds of native secondry index for a given column family")
+    public static class RebuildIndex extends NodeToolCmd
+    {
+        @Arguments(usage = "<keyspace> <cfname> [<indexName...>]", description = "The keyspace and column family name followed by an optional list of index names (IndexNameExample: Standard3.IdxName Standard3.IdxName1)")
+        List<String> args = new ArrayList<>();
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            checkArgument(args.size() >= 2, "rebuild_index requires ks and cf args");
+
+            List<String> indexNames = new ArrayList<>();
+            if (args.size() > 2)
+            {
+                indexNames.addAll(args.subList(2, args.size()));
+            }
+
+            probe.rebuildIndex(args.get(0), args.get(1), toArray(indexNames, String.class));
+        }
+    }
+
+    @Command(name = "resetlocalschema", description = "Reset node's local schema and resync")
+    public static class ResetLocalSchema extends NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            try
+            {
+                probe.resetLocalSchema();
+            } catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @Command(name = "enablebackup", description = "Enable incremental backup")
+    public static class EnableBackup extends NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            probe.setIncrementalBackupsEnabled(true);
+        }
+    }
+
+    @Command(name = "disablebackup", description = "Disable incremental backup")
+    public static class DisableBackup extends NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            probe.setIncrementalBackupsEnabled(false);
+        }
+    }
+
+    @Command(name = "setcachekeystosave", description = "Set number of keys saved by each cache for faster post-restart warmup. 0 to disable")
+    public static class SetCacheKeysToSave extends NodeToolCmd
+    {
+        @Arguments(title = "<key-cache-keys-to-save> <row-cache-keys-to-save>", usage = "<key-cache-keys-to-save> <row-cache-keys-to-save>", description = "The number of keys saved by each cache. 0 to disable", required = true)
+        private List<Integer> args = new ArrayList<>();
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            checkArgument(args.size()

<TRUNCATED>

[3/3] git commit: refactor nodetool, encapsulating each command into a subclass patch by Clément Lardeur; reviewed by Mikhail Stepura for CASSANDRA-6381

Posted by jb...@apache.org.
refactor nodetool, encapsulating each command into a subclass
patch by Clément Lardeur; reviewed by Mikhail Stepura for CASSANDRA-6381


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/892d8e69
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/892d8e69
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/892d8e69

Branch: refs/heads/trunk
Commit: 892d8e699cf5ca3807da288bd08c73319c35c3b8
Parents: eca02fd
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Jan 10 12:08:04 2014 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Jan 10 12:12:34 2014 -0600

----------------------------------------------------------------------
 bin/nodetool                                    |    2 +-
 build.xml                                       |    2 +
 .../org/apache/cassandra/tools/NodeCmd.java     | 1777 ---------------
 .../org/apache/cassandra/tools/NodeProbe.java   |    2 +-
 .../org/apache/cassandra/tools/NodeTool.java    | 2144 ++++++++++++++++++
 .../apache/cassandra/tools/NodeToolHelp.java    |   36 -
 .../apache/cassandra/tools/NodeToolHelp.yaml    |  217 --
 7 files changed, 2148 insertions(+), 2032 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/892d8e69/bin/nodetool
----------------------------------------------------------------------
diff --git a/bin/nodetool b/bin/nodetool
index 8642c0d..36a7016 100755
--- a/bin/nodetool
+++ b/bin/nodetool
@@ -90,6 +90,6 @@ esac
       -Xmx32m \
       -Dlogback.configurationFile=logback-tools.xml \
       -Dstorage-config="$CASSANDRA_CONF" \
-      org.apache.cassandra.tools.NodeCmd -p $JMX_PORT $ARGS
+      org.apache.cassandra.tools.NodeTool -p $JMX_PORT $ARGS
 
 # vi:ai sw=4 ts=4 tw=0 et

http://git-wip-us.apache.org/repos/asf/cassandra/blob/892d8e69/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 347323f..2e4719e 100644
--- a/build.xml
+++ b/build.xml
@@ -384,6 +384,7 @@
           <dependency groupId="com.addthis.metrics" artifactId="reporter-config" version="2.1.0" />
           <dependency groupId="edu.stanford.ppl" artifactId="snaptree" version="0.1" />
           <dependency groupId="org.mindrot" artifactId="jbcrypt" version="0.3m" />
+          <dependency groupId="io.airlift" artifactId="airline" version="0.6" />
           <dependency groupId="io.netty" artifactId="netty" version="3.6.6.Final" />
           <dependency groupId="com.google.code.findbugs" artifactId="jsr305" version="2.0.2" />
           <dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.1" />
@@ -418,6 +419,7 @@
         <dependency groupId="org.apache.pig" artifactId="pig"/>
         <dependency groupId="net.java.dev.jna" artifactId="jna"/>
       	<dependency groupId="com.google.code.findbugs" artifactId="jsr305"/>
+        <dependency groupId="io.airlift" artifactId="airline"/>
       </artifact:pom>
 
       <artifact:pom id="coverage-deps-pom"

http://git-wip-us.apache.org/repos/asf/cassandra/blob/892d8e69/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
deleted file mode 100644
index f2d56ae..0000000
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ /dev/null
@@ -1,1777 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.tools;
-
-import java.io.*;
-import java.lang.management.MemoryUsage;
-import java.net.ConnectException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.text.DecimalFormat;
-import java.text.SimpleDateFormat;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ExecutionException;
-import javax.management.openmbean.TabularData;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.LinkedHashMultimap;
-import com.google.common.collect.Maps;
-import com.yammer.metrics.reporting.JmxReporter;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.metrics.StorageMetrics;
-import org.apache.cassandra.utils.EstimatedHistogram;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.commons.cli.*;
-import org.yaml.snakeyaml.Yaml;
-import org.yaml.snakeyaml.constructor.Constructor;
-
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
-import org.apache.cassandra.db.ColumnFamilyStoreMBean;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.compaction.CompactionManagerMBean;
-import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
-import org.apache.cassandra.net.MessagingServiceMBean;
-import org.apache.cassandra.service.CacheServiceMBean;
-import org.apache.cassandra.streaming.StreamState;
-import org.apache.cassandra.streaming.ProgressInfo;
-import org.apache.cassandra.streaming.SessionInfo;
-import org.apache.cassandra.utils.Pair;
-
-public class NodeCmd
-{
-    private static final String HISTORYFILE = "nodetool.history";
-    private static final Pair<String, String> SNAPSHOT_COLUMNFAMILY_OPT = Pair.create("cf", "column-family");
-    private static final Pair<String, String> HOST_OPT = Pair.create("h", "host");
-    private static final Pair<String, String> PORT_OPT = Pair.create("p", "port");
-    private static final Pair<String, String> USERNAME_OPT = Pair.create("u", "username");
-    private static final Pair<String, String> PASSWORD_OPT = Pair.create("pw", "password");
-    private static final Pair<String, String> TAG_OPT = Pair.create("t", "tag");
-    private static final Pair<String, String> TOKENS_OPT = Pair.create("T", "tokens");
-    private static final Pair<String, String> PRIMARY_RANGE_OPT = Pair.create("pr", "partitioner-range");
-    private static final Pair<String, String> PARALLEL_REPAIR_OPT = Pair.create("par", "parallel");
-    private static final Pair<String, String> LOCAL_DC_REPAIR_OPT = Pair.create("local", "in-local-dc");
-    private static final Pair<String, String> DC_REPAIR_OPT = Pair.create("dc", "in-dc");
-    private static final Pair<String, String> START_TOKEN_OPT = Pair.create("st", "start-token");
-    private static final Pair<String, String> END_TOKEN_OPT = Pair.create("et", "end-token");
-    private static final Pair<String, String> UPGRADE_ALL_SSTABLE_OPT = Pair.create("a", "include-all-sstables");
-    private static final Pair<String, String> NO_SNAPSHOT = Pair.create("ns", "no-snapshot");
-    private static final Pair<String, String> CFSTATS_IGNORE_OPT = Pair.create("i", "ignore");
-    private static final Pair<String, String> RESOLVE_IP = Pair.create("r", "resolve-ip");
-
-    private static final String DEFAULT_HOST = "127.0.0.1";
-    private static final int DEFAULT_PORT = 7199;
-
-    private static final ToolOptions options = new ToolOptions();
-
-    private final NodeProbe probe;
-
-    static
-    {
-        options.addOption(SNAPSHOT_COLUMNFAMILY_OPT, true, "only take a snapshot of the specified table (column family)");
-        options.addOption(HOST_OPT,     true, "node hostname or ip address");
-        options.addOption(PORT_OPT,     true, "remote jmx agent port number");
-        options.addOption(USERNAME_OPT, true, "remote jmx agent username");
-        options.addOption(PASSWORD_OPT, true, "remote jmx agent password");
-        options.addOption(TAG_OPT,      true, "optional name to give a snapshot");
-        options.addOption(TOKENS_OPT,   false, "display all tokens");
-        options.addOption(PRIMARY_RANGE_OPT, false, "only repair the first range returned by the partitioner for the node");
-        options.addOption(PARALLEL_REPAIR_OPT, false, "repair nodes in parallel.");
-        options.addOption(LOCAL_DC_REPAIR_OPT, false, "only repair against nodes in the same datacenter");
-        options.addOption(DC_REPAIR_OPT, true, "only repair against nodes in the specified datacenters (comma separated)");
-        options.addOption(START_TOKEN_OPT, true, "token at which repair range starts");
-        options.addOption(END_TOKEN_OPT, true, "token at which repair range ends");
-        options.addOption(UPGRADE_ALL_SSTABLE_OPT, false, "includes sstables that are already on the most recent version during upgradesstables");
-        options.addOption(NO_SNAPSHOT, false, "disables snapshot creation for scrub");
-        options.addOption(CFSTATS_IGNORE_OPT, false, "ignore the supplied list of keyspace.columnfamiles in statistics");
-        options.addOption(RESOLVE_IP, false, "show node domain names instead of IPs");
-    }
-
-    public NodeCmd(NodeProbe probe)
-    {
-        this.probe = probe;
-    }
-
-    private enum NodeCommand
-    {
-        CFHISTOGRAMS,
-        CFSTATS,
-        CLEANUP,
-        CLEARSNAPSHOT,
-        COMPACT,
-        COMPACTIONSTATS,
-        COMPACTIONHISTORY,
-        DECOMMISSION,
-        DESCRIBECLUSTER,
-        DISABLEBINARY,
-        DISABLEGOSSIP,
-        DISABLEHANDOFF,
-        DISABLETHRIFT,
-        DRAIN,
-        ENABLEBINARY,
-        ENABLEGOSSIP,
-        ENABLEHANDOFF,
-        ENABLETHRIFT,
-        FLUSH,
-        GETCOMPACTIONTHRESHOLD,
-        DISABLEAUTOCOMPACTION,
-        ENABLEAUTOCOMPACTION,
-        GETCOMPACTIONTHROUGHPUT,
-        GETSTREAMTHROUGHPUT,
-        GETENDPOINTS,
-        GETSSTABLES,
-        GOSSIPINFO,
-        HELP,
-        INFO,
-        INVALIDATEKEYCACHE,
-        INVALIDATEROWCACHE,
-        JOIN,
-        MOVE,
-        NETSTATS,
-        PAUSEHANDOFF,
-        PROXYHISTOGRAMS,
-        REBUILD,
-        REFRESH,
-        REMOVETOKEN,
-        REMOVENODE,
-        REPAIR,
-        RESUMEHANDOFF,
-        RING,
-        SCRUB,
-        SETCACHECAPACITY,
-        SETCOMPACTIONTHRESHOLD,
-        SETCOMPACTIONTHROUGHPUT,
-        SETSTREAMTHROUGHPUT,
-        SETTRACEPROBABILITY,
-        SNAPSHOT,
-        STATUS,
-        STATUSBINARY,
-        STATUSTHRIFT,
-        STOP,
-        STOPDAEMON,
-        TPSTATS,
-        TRUNCATEHINTS,
-        UPGRADESSTABLES,
-        VERSION,
-        DESCRIBERING,
-        RANGEKEYSAMPLE,
-        REBUILD_INDEX,
-        RESETLOCALSCHEMA,
-        ENABLEBACKUP,
-        DISABLEBACKUP,
-        SETCACHEKEYSTOSAVE,
-        RELOADTRIGGERS
-    }
-
-
-    /**
-     * Prints usage information to stdout.
-     */
-    private static void printUsage()
-    {
-        HelpFormatter hf = new HelpFormatter();
-        StringBuilder header = new StringBuilder(512);
-        header.append("\nAvailable commands\n");
-        final NodeToolHelp ntHelp = loadHelp();
-        Collections.sort(ntHelp.commands, new Comparator<NodeToolHelp.NodeToolCommand>() 
-        {
-            @Override
-            public int compare(NodeToolHelp.NodeToolCommand o1, NodeToolHelp.NodeToolCommand o2) 
-            {
-                return o1.name.compareTo(o2.name);
-            }
-        });
-        for(NodeToolHelp.NodeToolCommand cmd : ntHelp.commands)
-            addCmdHelp(header, cmd);
-        String usage = String.format("java %s --host <arg> <command>%n", NodeCmd.class.getName());
-        hf.printHelp(usage, "", options, "");
-        System.out.println(header.toString());
-    }
-
-    private static NodeToolHelp loadHelp()
-    {
-        final InputStream is = NodeCmd.class.getClassLoader().getResourceAsStream("org/apache/cassandra/tools/NodeToolHelp.yaml");
-        assert is != null;
-
-        try
-        {
-            final Constructor constructor = new Constructor(NodeToolHelp.class);
-            final Yaml yaml = new Yaml(constructor);
-            return (NodeToolHelp)yaml.load(is);
-        }
-        finally
-        {
-            FileUtils.closeQuietly(is);
-        }
-    }
-
-    private static void addCmdHelp(StringBuilder sb, NodeToolHelp.NodeToolCommand cmd)
-    {
-        sb.append("  ").append(cmd.name);
-        // Ghetto indentation (trying, but not too hard, to not look too bad)
-        if (cmd.name.length() <= 20)
-            for (int i = cmd.name.length(); i < 22; ++i) sb.append(" ");
-        sb.append(" - ").append(cmd.help);
-  }
-
-
-    /**
-     * Write a textual representation of the Cassandra ring.
-     *
-     * @param outs
-     *            the stream to write to
-     */
-    public void printRing(PrintStream outs, String keyspace)
-    {
-        Map<String, String> tokensToEndpoints = probe.getTokenToEndpointMap();
-        LinkedHashMultimap<String, String> endpointsToTokens = LinkedHashMultimap.create();
-        for (Map.Entry<String, String> entry : tokensToEndpoints.entrySet())
-            endpointsToTokens.put(entry.getValue(), entry.getKey());
-
-        int maxAddressLength = Collections.max(endpointsToTokens.keys(), new Comparator<String>() {
-            @Override
-            public int compare(String first, String second)
-            {
-                return ((Integer)first.length()).compareTo((Integer)second.length());
-            }
-        }).length();
-
-        String formatPlaceholder = "%%-%ds  %%-12s%%-7s%%-8s%%-16s%%-20s%%-44s%%n";
-        String format = String.format(formatPlaceholder, maxAddressLength);
-
-        // Calculate per-token ownership of the ring
-        Map<InetAddress, Float> ownerships;
-        boolean keyspaceSelected;
-        try
-        {
-            ownerships = probe.effectiveOwnership(keyspace);
-            keyspaceSelected = true;
-        }
-        catch (IllegalStateException ex)
-        {
-            ownerships = probe.getOwnership();
-            outs.printf("Note: Ownership information does not include topology; for complete information, specify a keyspace%n");
-            keyspaceSelected = false;
-        }
-        try
-        {
-            outs.println();
-            Map<String, Map<InetAddress, Float>> perDcOwnerships = Maps.newLinkedHashMap();
-            // get the different datasets and map to tokens
-            for (Map.Entry<InetAddress, Float> ownership : ownerships.entrySet())
-            {
-                String dc = probe.getEndpointSnitchInfoProxy().getDatacenter(ownership.getKey().getHostAddress());
-                if (!perDcOwnerships.containsKey(dc))
-                    perDcOwnerships.put(dc, new LinkedHashMap<InetAddress, Float>());
-                perDcOwnerships.get(dc).put(ownership.getKey(), ownership.getValue());
-            }
-            for (Map.Entry<String, Map<InetAddress, Float>> entry : perDcOwnerships.entrySet())
-                printDc(outs, format, entry.getKey(), endpointsToTokens, keyspaceSelected, entry.getValue());
-        }
-        catch (UnknownHostException e)
-        {
-            throw new RuntimeException(e);
-        }
-
-        if(DatabaseDescriptor.getNumTokens() > 1)
-        {
-            outs.println("  Warning: \"nodetool ring\" is used to output all the tokens of a node.");
-            outs.println("  To view status related info of a node use \"nodetool status\" instead.\n");
-        }
-    }
-
-    private void printDc(PrintStream outs, String format, String dc, LinkedHashMultimap<String, String> endpointsToTokens,
-            boolean keyspaceSelected, Map<InetAddress, Float> filteredOwnerships)
-    {
-        Collection<String> liveNodes = probe.getLiveNodes();
-        Collection<String> deadNodes = probe.getUnreachableNodes();
-        Collection<String> joiningNodes = probe.getJoiningNodes();
-        Collection<String> leavingNodes = probe.getLeavingNodes();
-        Collection<String> movingNodes = probe.getMovingNodes();
-        Map<String, String> loadMap = probe.getLoadMap();
-
-        outs.println("Datacenter: " + dc);
-        outs.println("==========");
-
-        // get the total amount of replicas for this dc and the last token in this dc's ring
-        List<String> tokens = new ArrayList<String>();
-        String lastToken = "";
-
-        for (Map.Entry<InetAddress, Float> entry : filteredOwnerships.entrySet())
-        {
-            tokens.addAll(endpointsToTokens.get(entry.getKey().getHostAddress()));
-            lastToken = tokens.get(tokens.size() - 1);
-        }
-
-
-        outs.printf(format, "Address", "Rack", "Status", "State", "Load", "Owns", "Token");
-
-        if (filteredOwnerships.size() > 1)
-            outs.printf(format, "", "", "", "", "", "", lastToken);
-        else
-            outs.println();
-
-        for (Map.Entry<InetAddress, Float> entry : filteredOwnerships.entrySet())
-        {
-            String endpoint = entry.getKey().getHostAddress();
-            for (String token : endpointsToTokens.get(endpoint))
-            {
-                String rack;
-                try
-                {
-                    rack = probe.getEndpointSnitchInfoProxy().getRack(endpoint);
-                }
-                catch (UnknownHostException e)
-                {
-                    rack = "Unknown";
-                }
-
-                String status = liveNodes.contains(endpoint)
-                        ? "Up"
-                        : deadNodes.contains(endpoint)
-                                ? "Down"
-                                : "?";
-
-                String state = "Normal";
-
-                if (joiningNodes.contains(endpoint))
-                    state = "Joining";
-                else if (leavingNodes.contains(endpoint))
-                    state = "Leaving";
-                else if (movingNodes.contains(endpoint))
-                    state = "Moving";
-
-                String load = loadMap.containsKey(endpoint)
-                        ? loadMap.get(endpoint)
-                        : "?";
-                String owns = new DecimalFormat("##0.00%").format(entry.getValue());
-                outs.printf(format, endpoint, rack, status, state, load, owns, token);
-            }
-        }
-        outs.println();
-    }
-
-    private class ClusterStatus
-    {
-        String kSpace = null, format = null;
-        int maxAddressLength;
-        Collection<String> joiningNodes, leavingNodes, movingNodes, liveNodes, unreachableNodes;
-        Map<String, String> loadMap, hostIDMap, tokensToEndpoints;
-        EndpointSnitchInfoMBean epSnitchInfo;
-        PrintStream outs;
-        private final boolean resolveIp;
-
-        ClusterStatus(PrintStream outs, String kSpace, boolean resolveIp)
-        {
-            this.kSpace = kSpace;
-            this.outs = outs;
-            this.resolveIp = resolveIp;
-            joiningNodes = probe.getJoiningNodes();
-            leavingNodes = probe.getLeavingNodes();
-            movingNodes = probe.getMovingNodes();
-            loadMap = probe.getLoadMap();
-            tokensToEndpoints = probe.getTokenToEndpointMap();
-            liveNodes = probe.getLiveNodes();
-            unreachableNodes = probe.getUnreachableNodes();
-            hostIDMap = probe.getHostIdMap();
-            epSnitchInfo = probe.getEndpointSnitchInfoProxy();
-        }
-
-        private void printStatusLegend()
-        {
-            outs.println("Status=Up/Down");
-            outs.println("|/ State=Normal/Leaving/Joining/Moving");
-        }
-
-        class SetHostStat implements Iterable<HostStat> {
-            final List<HostStat> hostStats = new ArrayList<HostStat>();
-
-            public SetHostStat() {}
-
-            public SetHostStat(Map<InetAddress, Float> ownerships) {
-                for (Map.Entry<InetAddress, Float> entry : ownerships.entrySet()) {
-                    hostStats.add(new HostStat(entry));
-                }
-            }
-
-            @Override
-            public Iterator<HostStat> iterator() {
-                return hostStats.iterator();
-            }
-
-            public void add(HostStat entry) {
-                hostStats.add(entry);
-            }
-        }
-
-        class HostStat {
-            public final String ip;
-            public final String dns;
-            public final Float owns;
-
-            public HostStat(Map.Entry<InetAddress, Float> ownership) {
-                this.ip = ownership.getKey().getHostAddress();
-                this.dns = ownership.getKey().getHostName();
-                this.owns = ownership.getValue();
-            }
-
-            public String ipOrDns() {
-                if (resolveIp) {
-                    return dns;
-                }
-                return ip;
-            }
-        }
-
-        private Map<String, SetHostStat> getOwnershipByDc(SetHostStat ownerships)
-        throws UnknownHostException
-        {
-            Map<String, SetHostStat> ownershipByDc = Maps.newLinkedHashMap();
-            EndpointSnitchInfoMBean epSnitchInfo = probe.getEndpointSnitchInfoProxy();
-
-            for (HostStat ownership : ownerships)
-            {
-                String dc = epSnitchInfo.getDatacenter(ownership.ip);
-                if (!ownershipByDc.containsKey(dc))
-                    ownershipByDc.put(dc, new SetHostStat());
-                ownershipByDc.get(dc).add(ownership);
-            }
-
-            return ownershipByDc;
-        }
-
-        private String getFormat(boolean hasEffectiveOwns, boolean isTokenPerNode)
-        {
-            if (format == null)
-            {
-                StringBuilder buf = new StringBuilder();
-                String addressPlaceholder = String.format("%%-%ds  ", maxAddressLength);
-                buf.append("%s%s  ");                         // status
-                buf.append(addressPlaceholder);               // address
-                buf.append("%-9s  ");                         // load
-                if (!isTokenPerNode)  buf.append("%-6s  ");   // "Tokens"
-                if (hasEffectiveOwns) buf.append("%-16s  ");  // "Owns (effective)"
-                else                  buf.append("%-5s  ");   // "Owns
-                buf.append("%-36s  ");                        // Host ID
-                if (isTokenPerNode)   buf.append("%-39s  ");  // token
-                buf.append("%s%n");                           // "Rack"
-
-                format = buf.toString();
-            }
-
-            return format;
-        }
-
-        private void printNode(HostStat hostStat,
-                boolean hasEffectiveOwns, boolean isTokenPerNode) throws UnknownHostException
-        {
-            String status, state, load, strOwns, hostID, rack, fmt;
-            fmt = getFormat(hasEffectiveOwns, isTokenPerNode);
-            String endpoint = hostStat.ip;
-            if      (liveNodes.contains(endpoint))        status = "U";
-            else if (unreachableNodes.contains(endpoint)) status = "D";
-            else                                          status = "?";
-            if      (joiningNodes.contains(endpoint))     state = "J";
-            else if (leavingNodes.contains(endpoint))     state = "L";
-            else if (movingNodes.contains(endpoint))      state = "M";
-            else                                          state = "N";
-
-            load = loadMap.containsKey(endpoint) ? loadMap.get(endpoint) : "?";
-            strOwns = new DecimalFormat("##0.0%").format(hostStat.owns);
-            hostID = hostIDMap.get(endpoint);
-            rack = epSnitchInfo.getRack(endpoint);
-
-            if (isTokenPerNode)
-            {
-                outs.printf(fmt, status, state, hostStat.ipOrDns(), load, strOwns, hostID, probe.getTokens(endpoint).get(0), rack);
-            }
-            else
-            {
-                int tokens = probe.getTokens(endpoint).size();
-                outs.printf(fmt, status, state, hostStat.ipOrDns(), load, tokens, strOwns, hostID, rack);
-            }
-        }
-
-        private void printNodesHeader(boolean hasEffectiveOwns, boolean isTokenPerNode)
-        {
-            String fmt = getFormat(hasEffectiveOwns, isTokenPerNode);
-            String owns = hasEffectiveOwns ? "Owns (effective)" : "Owns";
-
-            if (isTokenPerNode)
-                outs.printf(fmt, "-", "-", "Address", "Load", owns, "Host ID", "Token", "Rack");
-            else
-                outs.printf(fmt, "-", "-", "Address", "Load", "Tokens", owns, "Host ID", "Rack");
-        }
-
-        void findMaxAddressLength(Map<String, SetHostStat> dcs) {
-            maxAddressLength = 0;
-            for (Map.Entry<String, SetHostStat> dc : dcs.entrySet())
-            {
-                for (HostStat stat : dc.getValue()) {
-                    maxAddressLength = Math.max(maxAddressLength, stat.ipOrDns().length());
-                }
-            }
-        }
-
-        void print() throws UnknownHostException
-        {
-            SetHostStat ownerships;
-            boolean hasEffectiveOwns = false, isTokenPerNode = true;
-
-            try
-            {
-                ownerships = new SetHostStat(probe.effectiveOwnership(kSpace));
-                hasEffectiveOwns = true;
-            }
-            catch (IllegalStateException e)
-            {
-                ownerships = new SetHostStat(probe.getOwnership());
-            }
-
-            // More tokens then nodes (aka vnodes)?
-            if (new HashSet<String>(tokensToEndpoints.values()).size() < tokensToEndpoints.keySet().size())
-                isTokenPerNode = false;
-
-            Map<String, SetHostStat> dcs = getOwnershipByDc(ownerships);
-
-            findMaxAddressLength(dcs);
-
-            // Datacenters
-            for (Map.Entry<String, SetHostStat> dc : dcs.entrySet())
-            {
-                String dcHeader = String.format("Datacenter: %s%n", dc.getKey());
-                outs.printf(dcHeader);
-                for (int i=0; i < (dcHeader.length() - 1); i++) outs.print('=');
-                outs.println();
-
-                printStatusLegend();
-                printNodesHeader(hasEffectiveOwns, isTokenPerNode);
-
-                // Nodes
-                for (HostStat entry : dc.getValue())
-                    printNode(entry, hasEffectiveOwns, isTokenPerNode);
-            }
-        }
-    }
-
-    /** Writes a keyspaceName of cluster-wide node information to a PrintStream
-     * @throws UnknownHostException */
-    public void printClusterStatus(PrintStream outs, String keyspace, boolean resolveIp) throws UnknownHostException
-    {
-        new ClusterStatus(outs, keyspace, resolveIp).print();
-    }
-
-    public void printThreadPoolStats(PrintStream outs)
-    {
-        outs.printf("%-25s%10s%10s%15s%10s%18s%n", "Pool Name", "Active", "Pending", "Completed", "Blocked", "All time blocked");
-
-        Iterator<Map.Entry<String, JMXEnabledThreadPoolExecutorMBean>> threads = probe.getThreadPoolMBeanProxies();
-        while (threads.hasNext())
-        {
-            Entry<String, JMXEnabledThreadPoolExecutorMBean> thread = threads.next();
-            String poolName = thread.getKey();
-            JMXEnabledThreadPoolExecutorMBean threadPoolProxy = thread.getValue();
-            outs.printf("%-25s%10s%10s%15s%10s%18s%n",
-                        poolName,
-                        threadPoolProxy.getActiveCount(),
-                        threadPoolProxy.getPendingTasks(),
-                        threadPoolProxy.getCompletedTasks(),
-                        threadPoolProxy.getCurrentlyBlockedTasks(),
-                        threadPoolProxy.getTotalBlockedTasks());
-        }
-
-        outs.printf("%n%-20s%10s%n", "Message type", "Dropped");
-        for (Entry<String, Integer> entry : probe.getDroppedMessages().entrySet())
-            outs.printf("%-20s%10s%n", entry.getKey(), entry.getValue());
-    }
-
-    /**
-     * Write node information.
-     *
-     * @param outs the stream to write to
-     */
-    public void printInfo(PrintStream outs, ToolCommandLine cmd)
-    {
-        boolean gossipInitialized = probe.isInitialized();
-        List<String> toks = probe.getTokens();
-
-        // If there is just 1 token, print it now like we always have, otherwise,
-        // require that -T/--tokens be passed (that output is potentially verbose).
-        if (toks.size() == 1)
-            outs.printf("%-17s: %s%n", "Token", toks.get(0));
-        else if (!cmd.hasOption(TOKENS_OPT.left))
-            outs.printf("%-17s: (invoke with -T/--tokens to see all %d tokens)%n", "Token", toks.size());
-
-        outs.printf("%-17s: %s%n", "ID", probe.getLocalHostId());
-        outs.printf("%-17s: %s%n", "Gossip active", gossipInitialized);
-        outs.printf("%-17s: %s%n", "Thrift active", probe.isThriftServerRunning());
-        outs.printf("%-17s: %s%n", "Native Transport active", probe.isNativeTransportRunning());
-        outs.printf("%-17s: %s%n", "Load", probe.getLoadString());
-        if (gossipInitialized)
-            outs.printf("%-17s: %s%n", "Generation No", probe.getCurrentGenerationNumber());
-        else
-            outs.printf("%-17s: %s%n", "Generation No", 0);
-
-        // Uptime
-        long secondsUp = probe.getUptime() / 1000;
-        outs.printf("%-17s: %d%n", "Uptime (seconds)", secondsUp);
-
-        // Memory usage
-        MemoryUsage heapUsage = probe.getHeapMemoryUsage();
-        double memUsed = (double)heapUsage.getUsed() / (1024 * 1024);
-        double memMax = (double)heapUsage.getMax() / (1024 * 1024);
-        outs.printf("%-17s: %.2f / %.2f%n", "Heap Memory (MB)", memUsed, memMax);
-
-        // Data Center/Rack
-        outs.printf("%-17s: %s%n", "Data Center", probe.getDataCenter());
-        outs.printf("%-17s: %s%n", "Rack", probe.getRack());
-
-        // Exceptions
-        outs.printf("%-17s: %s%n", "Exceptions", probe.getStorageMetric("Exceptions"));
-
-        CacheServiceMBean cacheService = probe.getCacheServiceMBean();
-
-        // Key Cache: Hits, Requests, RecentHitRate, SavePeriodInSeconds
-        outs.printf("%-17s: entries %d, size %d (bytes), capacity %d (bytes), %d hits, %d requests, %.3f recent hit rate, %d save period in seconds%n",
-                    "Key Cache",
-                    probe.getCacheMetric("KeyCache", "Entries"),
-                    probe.getCacheMetric("KeyCache", "Size"),
-                    probe.getCacheMetric("KeyCache", "Capacity"),
-                    probe.getCacheMetric("KeyCache", "Hits"),
-                    probe.getCacheMetric("KeyCache", "Requests"),
-                    probe.getCacheMetric("KeyCache", "HitRate"),
-                    cacheService.getKeyCacheSavePeriodInSeconds());
-
-        // Row Cache: Hits, Requests, RecentHitRate, SavePeriodInSeconds
-        outs.printf("%-17s: entries %d, size %d (bytes), capacity %d (bytes), %d hits, %d requests, %.3f recent hit rate, %d save period in seconds%n",
-                    "Row Cache",
-                    probe.getCacheMetric("RowCache", "Entries"),
-                    probe.getCacheMetric("RowCache", "Size"),
-                    probe.getCacheMetric("RowCache", "Capacity"),
-                    probe.getCacheMetric("RowCache", "Hits"),
-                    probe.getCacheMetric("RowCache", "Requests"),
-                    probe.getCacheMetric("RowCache", "HitRate"),
-                    cacheService.getRowCacheSavePeriodInSeconds());
-
-        if (toks.size() > 1 && cmd.hasOption(TOKENS_OPT.left))
-        {
-            for (String tok : toks)
-                outs.printf("%-17s: %s%n", "Token", tok);
-        }
-    }
-
-    public void printReleaseVersion(PrintStream outs)
-    {
-        outs.println("ReleaseVersion: " + probe.getReleaseVersion());
-    }
-
-    public void printNetworkStats(final InetAddress addr, PrintStream outs)
-    {
-        outs.printf("Mode: %s%n", probe.getOperationMode());
-        Set<StreamState> statuses = probe.getStreamStatus();
-        if (statuses.isEmpty())
-            outs.println("Not sending any streams.");
-        for (StreamState status : statuses)
-        {
-            outs.printf("%s %s%n", status.description, status.planId.toString());
-            for (SessionInfo info : status.sessions)
-            {
-                outs.printf("    %s%n", info.peer.toString());
-                if (!info.receivingSummaries.isEmpty())
-                {
-                    outs.printf("        Receiving %d files, %d bytes total%n", info.getTotalFilesToReceive(), info.getTotalSizeToReceive());
-                    for (ProgressInfo progress : info.getReceivingFiles())
-                    {
-                        outs.printf("            %s%n", progress.toString());
-                    }
-                }
-                if (!info.sendingSummaries.isEmpty())
-                {
-                    outs.printf("        Sending %d files, %d bytes total%n", info.getTotalFilesToSend(), info.getTotalSizeToSend());
-                    for (ProgressInfo progress : info.getSendingFiles())
-                    {
-                        outs.printf("            %s%n", progress.toString());
-                    }
-                }
-            }
-        }
-
-        outs.printf("Read Repair Statistics:%nAttempted: %d%nMismatch (Blocking): %d%nMismatch (Background): %d%n", probe.getReadRepairAttempted(), probe.getReadRepairRepairedBlocking(), probe.getReadRepairRepairedBackground());
-
-        MessagingServiceMBean ms = probe.msProxy;
-        outs.printf("%-25s", "Pool Name");
-        outs.printf("%10s", "Active");
-        outs.printf("%10s", "Pending");
-        outs.printf("%15s%n", "Completed");
-
-        int pending;
-        long completed;
-
-        pending = 0;
-        for (int n : ms.getCommandPendingTasks().values())
-            pending += n;
-        completed = 0;
-        for (long n : ms.getCommandCompletedTasks().values())
-            completed += n;
-        outs.printf("%-25s%10s%10s%15s%n", "Commands", "n/a", pending, completed);
-
-        pending = 0;
-        for (int n : ms.getResponsePendingTasks().values())
-            pending += n;
-        completed = 0;
-        for (long n : ms.getResponseCompletedTasks().values())
-            completed += n;
-        outs.printf("%-25s%10s%10s%15s%n", "Responses", "n/a", pending, completed);
-    }
-
-    public void printCompactionStats(PrintStream outs)
-    {
-        int compactionThroughput = probe.getCompactionThroughput();
-        CompactionManagerMBean cm = probe.getCompactionManagerProxy();
-        outs.println("pending tasks: " + probe.getCompactionMetric("PendingTasks"));
-        if (cm.getCompactions().size() > 0)
-            outs.printf("%25s%16s%16s%16s%16s%10s%10s%n", "compaction type", "keyspace", "table", "completed", "total", "unit", "progress");
-        long remainingBytes = 0;
-        for (Map<String, String> c : cm.getCompactions())
-        {
-            String percentComplete = new Long(c.get("total")) == 0
-                                   ? "n/a"
-                                   : new DecimalFormat("0.00").format((double) new Long(c.get("completed")) / new Long(c.get("total")) * 100) + "%";
-            outs.printf("%25s%16s%16s%16s%16s%10s%10s%n", c.get("taskType"), c.get("keyspace"), c.get("columnfamily"), c.get("completed"), c.get("total"), c.get("unit"), percentComplete);
-            if (c.get("taskType").equals(OperationType.COMPACTION.toString()))
-                remainingBytes += (new Long(c.get("total")) - new Long(c.get("completed")));
-        }
-        long remainingTimeInSecs = compactionThroughput == 0 || remainingBytes == 0
-                        ? -1
-                        : (remainingBytes) / (long) (1024L * 1024L * compactionThroughput);
-        String remainingTime = remainingTimeInSecs < 0
-                        ? "n/a"
-                        : String.format("%dh%02dm%02ds", remainingTimeInSecs / 3600, (remainingTimeInSecs % 3600) / 60, (remainingTimeInSecs % 60));
-
-        outs.printf("%25s%10s%n", "Active compaction remaining time : ", remainingTime);
-    }
-
-    /**
-     * Print the compaction threshold
-     *
-     * @param outs the stream to write to
-     */
-    public void printCompactionThreshold(PrintStream outs, String ks, String cf)
-    {
-        ColumnFamilyStoreMBean cfsProxy = probe.getCfsProxy(ks, cf);
-        outs.println("Current compaction thresholds for " + ks + "/" + cf + ": \n" +
-                     " min = " + cfsProxy.getMinimumCompactionThreshold() + ", " +
-                     " max = " + cfsProxy.getMaximumCompactionThreshold());
-    }
-
-    /**
-     * Print the compaction throughput
-     *
-     * @param outs the stream to write to
-     */
-    public void printCompactionThroughput(PrintStream outs)
-    {
-        outs.println("Current compaction throughput: " + probe.getCompactionThroughput() + " MB/s");
-    }
-
-    /**
-     * Print the stream throughput
-     *
-     * @param outs the stream to write to
-     */
-    public void printStreamThroughput(PrintStream outs)
-    {
-        outs.println("Current stream throughput: " + probe.getStreamThroughput() + " MB/s");
-    }
-
-    /**
-     * Print the name, snitch, partitioner and schema version(s) of a cluster
-     *
-     * @param outs Output stream
-     * @param host Server address
-     */
-    public void printClusterDescription(PrintStream outs, String host)
-    {
-        // display cluster name, snitch and partitioner
-        outs.println("Cluster Information:");
-        outs.println("\tName: " + probe.getClusterName());
-        outs.println("\tSnitch: " + probe.getEndpointSnitchInfoProxy().getSnitchName());
-        outs.println("\tPartitioner: " + probe.getPartitioner());
-
-        // display schema version for each node
-        outs.println("\tSchema versions:");
-        Map<String, List<String>> schemaVersions = probe.getSpProxy().getSchemaVersions();
-        for (String version : schemaVersions.keySet())
-        {
-            outs.println(String.format("\t\t%s: %s%n", version, schemaVersions.get(version)));
-        }
-    }
-
-    public void printColumnFamilyStats(PrintStream outs, boolean ignoreMode, String [] filterList)
-    {
-        OptionFilter filter = new OptionFilter(ignoreMode, filterList);
-        Map <String, List <ColumnFamilyStoreMBean>> cfstoreMap = new HashMap <String, List <ColumnFamilyStoreMBean>>();
-
-        // get a list of column family stores
-        Iterator<Map.Entry<String, ColumnFamilyStoreMBean>> cfamilies = probe.getColumnFamilyStoreMBeanProxies();
-
-        while (cfamilies.hasNext())
-        {
-            Entry<String, ColumnFamilyStoreMBean> entry = cfamilies.next();
-            String keyspaceName = entry.getKey();
-            ColumnFamilyStoreMBean cfsProxy = entry.getValue();
-
-            if (!cfstoreMap.containsKey(keyspaceName) && filter.isColumnFamilyIncluded(entry.getKey(), cfsProxy.getColumnFamilyName()))
-            {
-                List<ColumnFamilyStoreMBean> columnFamilies = new ArrayList<ColumnFamilyStoreMBean>();
-                columnFamilies.add(cfsProxy);
-                cfstoreMap.put(keyspaceName, columnFamilies);
-            }
-            else if (filter.isColumnFamilyIncluded(entry.getKey(), cfsProxy.getColumnFamilyName()))
-            {
-                cfstoreMap.get(keyspaceName).add(cfsProxy);
-            }
-        }
-
-        // make sure all specified kss and cfs exist
-        filter.verifyKeyspaces(probe.getKeyspaces());
-        filter.verifyColumnFamilies();
-
-        // print out the table statistics
-        for (Entry<String, List<ColumnFamilyStoreMBean>> entry : cfstoreMap.entrySet())
-        {
-            String keyspaceName = entry.getKey();
-            List<ColumnFamilyStoreMBean> columnFamilies = entry.getValue();
-            long keyspaceReadCount = 0;
-            long keyspaceWriteCount = 0;
-            int keyspacePendingTasks = 0;
-            double keyspaceTotalReadTime = 0.0f;
-            double keyspaceTotalWriteTime = 0.0f;
-
-            outs.println("Keyspace: " + keyspaceName);
-            for (ColumnFamilyStoreMBean cfstore : columnFamilies)
-            {
-                String cfName = cfstore.getColumnFamilyName();
-                long writeCount = ((JmxReporter.TimerMBean)probe.getColumnFamilyMetric(keyspaceName, cfName, "WriteLatency")).getCount();
-                long readCount = ((JmxReporter.TimerMBean)probe.getColumnFamilyMetric(keyspaceName, cfName, "ReadLatency")).getCount();
-
-                if (readCount > 0)
-                {
-                    keyspaceReadCount += readCount;
-                    keyspaceTotalReadTime += (long)probe.getColumnFamilyMetric(keyspaceName, cfName, "ReadTotalLatency");
-                }
-                if (writeCount > 0)
-                {
-                    keyspaceWriteCount += writeCount;
-                    keyspaceTotalWriteTime += (long)probe.getColumnFamilyMetric(keyspaceName, cfName, "WriteTotalLatency");
-                }
-                keyspacePendingTasks += (int)probe.getColumnFamilyMetric(keyspaceName, cfName, "PendingTasks");
-            }
-
-            double keyspaceReadLatency = keyspaceReadCount > 0 ? keyspaceTotalReadTime / keyspaceReadCount / 1000 : Double.NaN;
-            double keyspaceWriteLatency = keyspaceWriteCount > 0 ? keyspaceTotalWriteTime / keyspaceWriteCount / 1000 : Double.NaN;
-
-            outs.println("\tRead Count: " + keyspaceReadCount);
-            outs.println("\tRead Latency: " + String.format("%s", keyspaceReadLatency) + " ms.");
-            outs.println("\tWrite Count: " + keyspaceWriteCount);
-            outs.println("\tWrite Latency: " + String.format("%s", keyspaceWriteLatency) + " ms.");
-            outs.println("\tPending Tasks: " + keyspacePendingTasks);
-
-            // print out column family statistics for this keyspace
-            for (ColumnFamilyStoreMBean cfstore : columnFamilies)
-            {
-                String cfName = cfstore.getColumnFamilyName();
-                if(cfName.contains("."))
-                    outs.println("\t\tTable (index): " + cfName);
-                else
-                    outs.println("\t\tTable: " + cfName);
-
-                outs.println("\t\tSSTable count: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "LiveSSTableCount"));
-
-                int[] leveledSStables = cfstore.getSSTableCountPerLevel();
-                if (leveledSStables != null)
-                {
-                    outs.print("\t\tSSTables in each level: [");
-                    for (int level = 0; level < leveledSStables.length; level++)
-                    {
-                        int count = leveledSStables[level];
-                        outs.print(count);
-                        long maxCount = 4L; // for L0
-                        if (level > 0)
-                            maxCount = (long) Math.pow(10, level);
-                        //  show max threshold for level when exceeded
-                        if (count > maxCount)
-                            outs.print("/" + maxCount);
-
-                        if (level < leveledSStables.length - 1)
-                            outs.print(", ");
-                        else
-                            outs.println("]");
-                    }
-                }
-                outs.println("\t\tSpace used (live), bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "LiveDiskSpaceUsed"));
-                outs.println("\t\tSpace used (total), bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "TotalDiskSpaceUsed"));
-                outs.println("\t\tSpace used by snapshots (total), bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "SnapshotsSize"));
-                outs.println("\t\tSSTable Compression Ratio: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "CompressionRatio"));
-                outs.println("\t\tMemtable cell count: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "MemtableColumnsCount"));
-                outs.println("\t\tMemtable data size, bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "MemtableDataSize"));
-                outs.println("\t\tMemtable switch count: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "MemtableSwitchCount"));
-                outs.println("\t\tLocal read count: " + ((JmxReporter.TimerMBean)probe.getColumnFamilyMetric(keyspaceName, cfName, "ReadLatency")).getCount());
-                double localReadLatency = ((JmxReporter.TimerMBean)probe.getColumnFamilyMetric(keyspaceName, cfName, "ReadLatency")).getMean() / 1000;
-                double localRLatency = localReadLatency > 0 ? localReadLatency : Double.NaN;
-                outs.printf("\t\tLocal read latency: %01.3f ms%n", localRLatency);
-                outs.println("\t\tLocal write count: " + ((JmxReporter.TimerMBean)probe.getColumnFamilyMetric(keyspaceName, cfName, "WriteLatency")).getCount());
-                double localWriteLatency = ((JmxReporter.TimerMBean)probe.getColumnFamilyMetric(keyspaceName, cfName, "WriteLatency")).getMean() / 1000;
-                double localWLatency = localWriteLatency > 0 ? localWriteLatency : Double.NaN;
-                outs.printf("\t\tLocal write latency: %01.3f ms%n", localWLatency);
-                outs.println("\t\tPending tasks: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "PendingTasks"));
-                outs.println("\t\tBloom filter false positives: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "BloomFilterFalsePositives"));
-                outs.println("\t\tBloom filter false ratio: " + String.format("%01.5f", probe.getColumnFamilyMetric(keyspaceName, cfName, "RecentBloomFilterFalseRatio")));
-                outs.println("\t\tBloom filter space used, bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "BloomFilterDiskSpaceUsed"));
-                outs.println("\t\tCompacted partition minimum bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "MinRowSize"));
-                outs.println("\t\tCompacted partition maximum bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "MaxRowSize"));
-                outs.println("\t\tCompacted partition mean bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "MeanRowSize"));
-                outs.println("\t\tAverage live cells per slice (last five minutes): " + ((JmxReporter.HistogramMBean)probe.getColumnFamilyMetric(keyspaceName, cfName, "LiveScannedHistogram")).getMean());
-                outs.println("\t\tAverage tombstones per slice (last five minutes): " + ((JmxReporter.HistogramMBean)probe.getColumnFamilyMetric(keyspaceName, cfName, "TombstoneScannedHistogram")).getMean());
-
-                outs.println("");
-            }
-            outs.println("----------------");
-        }
-    }
-
-    public void printRemovalStatus(PrintStream outs)
-    {
-        outs.println("RemovalStatus: " + probe.getRemovalStatus());
-    }
-
-    private void printCfHistograms(String keySpace, String columnFamily, PrintStream output)
-    {
-        // calculate percentile of row size and column count
-        long[] estimatedRowSize = (long[]) probe.getColumnFamilyMetric(keySpace, columnFamily, "EstimatedRowSizeHistogram");
-        long[] estimatedColumnCount = (long[]) probe.getColumnFamilyMetric(keySpace, columnFamily, "EstimatedColumnCountHistogram");
-
-        long[] bucketOffsets = new EstimatedHistogram().getBucketOffsets();
-        EstimatedHistogram rowSizeHist = new EstimatedHistogram(bucketOffsets, estimatedRowSize);
-        EstimatedHistogram columnCountHist = new EstimatedHistogram(bucketOffsets, estimatedColumnCount);
-
-        // build arrays to store percentile values
-        double[] estimatedRowSizePercentiles = new double[7];
-        double[] estimatedColumnCountPercentiles = new double[7];
-        double[] offsetPercentiles = new double[]{0.5, 0.75, 0.95, 0.98, 0.99};
-        for (int i = 0; i < offsetPercentiles.length; i++)
-        {
-            estimatedRowSizePercentiles[i] = rowSizeHist.percentile(offsetPercentiles[i]);
-            estimatedColumnCountPercentiles[i] = columnCountHist.percentile(offsetPercentiles[i]);
-        }
-
-        // min value
-        estimatedRowSizePercentiles[5] = rowSizeHist.min();
-        estimatedColumnCountPercentiles[5] = columnCountHist.min();
-        // max value
-        estimatedRowSizePercentiles[6] = rowSizeHist.max();
-        estimatedColumnCountPercentiles[6] = columnCountHist.max();
-
-        String[] percentiles = new String[]{ "50%", "75%", "95%", "98%", "99%", "Min", "Max" };
-        double[] readLatency = probe.metricPercentilesAsArray((JmxReporter.HistogramMBean)probe.getColumnFamilyMetric(keySpace, columnFamily, "ReadLatency"));
-        double[] writeLatency = probe.metricPercentilesAsArray((JmxReporter.TimerMBean)probe.getColumnFamilyMetric(keySpace, columnFamily, "WriteLatency"));
-        double[] sstablesPerRead = probe.metricPercentilesAsArray((JmxReporter.HistogramMBean)probe.getColumnFamilyMetric(keySpace, columnFamily, "SSTablesPerReadHistogram"));
-
-        output.println(String.format("%s/%s histograms", keySpace, columnFamily));
-        output.println(String.format("%-10s%10s%18s%18s%18s%18s",
-                                     "Percentile", "SSTables", "Write Latency", "Read Latency", "Partition Size", "Cell Count"));
-        output.println(String.format("%-10s%10s%18s%18s%18s%18s",
-                                     "", "", "(micros)", "(micros)", "(bytes)", ""));
-
-        for (int i = 0; i < percentiles.length; i++)
-        {
-            output.println(String.format("%-10s%10.2f%18.2f%18.2f%18.0f%18.0f",
-                                         percentiles[i],
-                                         sstablesPerRead[i],
-                                         writeLatency[i],
-                                         readLatency[i],
-                                         estimatedRowSizePercentiles[i],
-                                         estimatedColumnCountPercentiles[i]));
-        }
-        output.println();
-    }
-
-    private void printProxyHistograms(PrintStream output)
-    {
-        String[] percentiles = new String[]{ "50%", "75%", "95%", "98%", "99%", "Min", "Max" };
-        double[] readLatency = probe.metricPercentilesAsArray(probe.getProxyMetric("Read"));
-        double[] writeLatency = probe.metricPercentilesAsArray(probe.getProxyMetric("Write"));
-        double[] rangeLatency = probe.metricPercentilesAsArray(probe.getProxyMetric("RangeSlice"));
-
-        output.println("proxy histograms");
-        output.println(String.format("%-10s%18s%18s%18s",
-                                     "Percentile", "Read Latency", "Write Latency", "Range Latency"));
-        output.println(String.format("%-10s%18s%18s%18s",
-                                     "", "(micros)", "(micros)", "(micros)"));
-        for (int i = 0; i < percentiles.length; i++)
-        {
-            output.println(String.format("%-10s%18.2f%18.2f%18.2f",
-                                         percentiles[i],
-                                         readLatency[i],
-                                         writeLatency[i],
-                                         rangeLatency[i]));
-        }
-        output.println();
-    }
-
-    private void printEndPoints(String keySpace, String cf, String key, PrintStream output)
-    {
-        List<InetAddress> endpoints = this.probe.getEndpoints(keySpace, cf, key);
-
-        for (InetAddress anEndpoint : endpoints)
-        {
-           output.println(anEndpoint.getHostAddress());
-        }
-    }
-
-    private void printSSTables(String keyspace, String cf, String key, PrintStream output)
-    {
-        List<String> sstables = this.probe.getSSTables(keyspace, cf, key);
-        for (String sstable : sstables)
-        {
-            output.println(sstable);
-        }
-    }
-
-    private void printIsNativeTransportRunning(PrintStream outs)
-    {
-        outs.println(probe.isNativeTransportRunning() ? "running" : "not running");
-    }
-
-    private void printIsThriftServerRunning(PrintStream outs)
-    {
-        outs.println(probe.isThriftServerRunning() ? "running" : "not running");
-    }
-
-    public static void main(String[] args) throws IOException, InterruptedException, ParseException
-    {
-        CommandLineParser parser = new PosixParser();
-        ToolCommandLine cmd = null;
-
-        try
-        {
-            cmd = new ToolCommandLine(parser.parse(options, args));
-        }
-        catch (ParseException p)
-        {
-            badUse(p.getMessage());
-        }
-
-        String host = cmd.hasOption(HOST_OPT.left) ? cmd.getOptionValue(HOST_OPT.left) : DEFAULT_HOST;
-
-        int port = DEFAULT_PORT;
-
-        String portNum = cmd.getOptionValue(PORT_OPT.left);
-        if (portNum != null)
-        {
-            try
-            {
-                port = Integer.parseInt(portNum);
-            }
-            catch (NumberFormatException e)
-            {
-                throw new ParseException("Port must be a number");
-            }
-        }
-
-        NodeCommand command = null;
-
-        try
-        {
-            command = cmd.getCommand();
-        }
-        catch (IllegalArgumentException e)
-        {
-            badUse(e.getMessage());
-        }
-
-        if(NodeCommand.HELP.equals(command))
-        {
-            printUsage();
-            System.exit(0);
-        }
-
-        NodeProbe probe = null;
-
-        try
-        {
-            String username = cmd.getOptionValue(USERNAME_OPT.left);
-            String password = cmd.getOptionValue(PASSWORD_OPT.left);
-
-            try
-            {
-                probe = username == null ? new NodeProbe(host, port) : new NodeProbe(host, port, username, password);
-            }
-            catch (IOException ioe)
-            {
-                Throwable inner = findInnermostThrowable(ioe);
-                if (inner instanceof ConnectException)
-                {
-                    System.err.printf("Failed to connect to '%s:%d': %s%n", host, port, inner.getMessage());
-                    System.exit(1);
-                }
-                else if (inner instanceof UnknownHostException)
-                {
-                    System.err.printf("Cannot resolve '%s': unknown host%n", host);
-                    System.exit(1);
-                }
-                else
-                {
-                    err(ioe, "Error connecting to remote JMX agent!");
-                }
-            }
-
-            NodeCmd nodeCmd = new NodeCmd(probe);
-
-            //print history here after we've already determined we can reasonably call cassandra
-            printHistory(args, cmd);
-
-            // Execute the requested command.
-            String[] arguments = cmd.getCommandArguments();
-            String tag;
-            String columnFamilyName = null;
-
-            switch (command)
-            {
-                case RING :
-                    if (arguments.length > 0) { nodeCmd.printRing(System.out, arguments[0]); }
-                    else                      { nodeCmd.printRing(System.out, null); };
-                    break;
-
-                case INFO            : nodeCmd.printInfo(System.out, cmd); break;
-                case CFSTATS         :
-                    boolean ignoreMode = cmd.hasOption(CFSTATS_IGNORE_OPT.left);
-                    if (arguments.length > 0) { nodeCmd.printColumnFamilyStats(System.out, ignoreMode, arguments); }
-                    else                      { nodeCmd.printColumnFamilyStats(System.out, false, null); }
-                    break;
-                case TPSTATS         : nodeCmd.printThreadPoolStats(System.out); break;
-                case VERSION         : nodeCmd.printReleaseVersion(System.out); break;
-                case COMPACTIONSTATS : nodeCmd.printCompactionStats(System.out); break;
-                case COMPACTIONHISTORY:nodeCmd.printCompactionHistory(System.out); break;
-                case DESCRIBECLUSTER : nodeCmd.printClusterDescription(System.out, host); break;
-                case DISABLEBINARY   : probe.stopNativeTransport(); break;
-                case ENABLEBINARY    : probe.startNativeTransport(); break;
-                case STATUSBINARY    : nodeCmd.printIsNativeTransportRunning(System.out); break;
-                case DISABLEGOSSIP   : probe.stopGossiping(); break;
-                case ENABLEGOSSIP    : probe.startGossiping(); break;
-                case DISABLEHANDOFF  : probe.disableHintedHandoff(); break;
-                case ENABLEHANDOFF   : probe.enableHintedHandoff(); break;
-                case PAUSEHANDOFF    : probe.pauseHintsDelivery(); break;
-                case RESUMEHANDOFF   : probe.resumeHintsDelivery(); break;
-                case DISABLETHRIFT   : probe.stopThriftServer(); break;
-                case ENABLETHRIFT    : probe.startThriftServer(); break;
-                case STATUSTHRIFT    : nodeCmd.printIsThriftServerRunning(System.out); break;
-                case RESETLOCALSCHEMA: probe.resetLocalSchema(); break;
-                case ENABLEBACKUP    : probe.setIncrementalBackupsEnabled(true); break;
-                case DISABLEBACKUP   : probe.setIncrementalBackupsEnabled(false); break;
-
-                case TRUNCATEHINTS:
-                    if (arguments.length > 1) badUse("Too many arguments.");
-                    else if (arguments.length == 1) probe.truncateHints(arguments[0]);
-                    else probe.truncateHints();
-                    break;
-
-                case STATUS :
-                    boolean resolveIp = cmd.hasOption(RESOLVE_IP.left);
-                    if (arguments.length > 0) nodeCmd.printClusterStatus(System.out, arguments[0], resolveIp);
-                    else                      nodeCmd.printClusterStatus(System.out, null, resolveIp);
-                    break;
-
-                case DECOMMISSION :
-                    if (arguments.length > 0)
-                    {
-                        System.err.println("Decommission will decommission the node you are connected to and does not take arguments!");
-                        System.exit(1);
-                    }
-                    probe.decommission();
-                    break;
-
-                case DRAIN :
-                    try { probe.drain(); }
-                    catch (ExecutionException ee) { err(ee, "Error occured during flushing"); }
-                    break;
-
-                case NETSTATS :
-                    if (arguments.length > 0) { nodeCmd.printNetworkStats(InetAddress.getByName(arguments[0]), System.out); }
-                    else                      { nodeCmd.printNetworkStats(null, System.out); }
-                    break;
-
-                case SNAPSHOT :
-                    columnFamilyName = cmd.getOptionValue(SNAPSHOT_COLUMNFAMILY_OPT.left);
-                    /* FALL THRU */
-                case CLEARSNAPSHOT :
-                    tag = cmd.getOptionValue(TAG_OPT.left);
-                    handleSnapshots(command, tag, arguments, columnFamilyName, probe);
-                    break;
-
-                case MOVE :
-                    if (arguments.length != 1) { badUse("Missing token argument for move."); }
-                    try
-                    {
-                        probe.move(arguments[0]);
-                    }
-                    catch (UnsupportedOperationException uoerror)
-                    {
-                        System.err.println(uoerror.getMessage());
-                        System.exit(1);
-                    }
-                    break;
-
-                case JOIN:
-                    if (probe.isJoined())
-                    {
-                        System.err.println("This node has already joined the ring.");
-                        System.exit(1);
-                    }
-
-                    probe.joinRing();
-                    break;
-
-                case SETCOMPACTIONTHROUGHPUT :
-                    if (arguments.length != 1) { badUse("Missing value argument."); }
-                    probe.setCompactionThroughput(Integer.parseInt(arguments[0]));
-                    break;
-
-                case SETSTREAMTHROUGHPUT :
-                    if (arguments.length != 1) { badUse("Missing value argument."); }
-                    probe.setStreamThroughput(Integer.parseInt(arguments[0]));
-                    break;
-
-                case SETTRACEPROBABILITY :
-                    if (arguments.length != 1) { badUse("Missing value argument."); }
-                    probe.setTraceProbability(Double.parseDouble(arguments[0]));
-                    break;
-
-                case REBUILD :
-                    if (arguments.length > 1) { badUse("Too many arguments."); }
-                    probe.rebuild(arguments.length == 1 ? arguments[0] : null);
-                    break;
-
-                case REMOVETOKEN :
-                    System.err.println("Warn: removetoken is deprecated, please use removenode instead");
-                case REMOVENODE  :
-                    if (arguments.length != 1) { badUse("Missing an argument for removenode (either status, force, or an ID)"); }
-                    else if (arguments[0].equals("status")) { nodeCmd.printRemovalStatus(System.out); }
-                    else if (arguments[0].equals("force"))  { nodeCmd.printRemovalStatus(System.out); probe.forceRemoveCompletion(); }
-                    else                                    { probe.removeNode(arguments[0]); }
-                    break;
-
-                case INVALIDATEKEYCACHE :
-                    probe.invalidateKeyCache();
-                    break;
-
-                case INVALIDATEROWCACHE :
-                    probe.invalidateRowCache();
-                    break;
-
-                case CLEANUP :
-                case COMPACT :
-                case REPAIR  :
-                case FLUSH   :
-                case SCRUB   :
-                case UPGRADESSTABLES   :
-                case DISABLEAUTOCOMPACTION:
-                case ENABLEAUTOCOMPACTION:
-                    optionalKSandCFs(command, cmd, arguments, probe);
-                    break;
-
-                case GETCOMPACTIONTHRESHOLD :
-                    if (arguments.length != 2) { badUse("getcompactionthreshold requires ks and cf args."); }
-                    nodeCmd.printCompactionThreshold(System.out, arguments[0], arguments[1]);
-                    break;
-
-                case GETCOMPACTIONTHROUGHPUT : nodeCmd.printCompactionThroughput(System.out); break;
-                case GETSTREAMTHROUGHPUT : nodeCmd.printStreamThroughput(System.out); break;
-
-                case CFHISTOGRAMS :
-                    if (arguments.length != 2) { badUse("cfhistograms requires ks and cf args"); }
-                    nodeCmd.printCfHistograms(arguments[0], arguments[1], System.out);
-                    break;
-
-                case SETCACHECAPACITY :
-                    if (arguments.length != 2) { badUse("setcachecapacity requires key-cache-capacity, and row-cache-capacity args."); }
-                    probe.setCacheCapacities(Integer.parseInt(arguments[0]), Integer.parseInt(arguments[1]));
-                    break;
-
-                case SETCACHEKEYSTOSAVE :
-                    if (arguments.length != 2) { badUse("setcachekeystosave requires key-cache-keys-to-save, and row-cache-keys-to-save args."); }
-                    probe.setCacheKeysToSave(Integer.parseInt(arguments[0]), Integer.parseInt(arguments[1]));
-                    break;
-
-                case SETCOMPACTIONTHRESHOLD :
-                    if (arguments.length != 4) { badUse("setcompactionthreshold requires ks, cf, min, and max threshold args."); }
-                    int minthreshold = Integer.parseInt(arguments[2]);
-                    int maxthreshold = Integer.parseInt(arguments[3]);
-                    if ((minthreshold < 0) || (maxthreshold < 0)) { badUse("Thresholds must be positive integers"); }
-                    if (minthreshold > maxthreshold)              { badUse("Min threshold cannot be greater than max."); }
-                    if (minthreshold < 2 && maxthreshold != 0)    { badUse("Min threshold must be at least 2"); }
-                    probe.setCompactionThreshold(arguments[0], arguments[1], minthreshold, maxthreshold);
-                    break;
-                case GETENDPOINTS :
-                    if (arguments.length != 3) { badUse("getendpoints requires ks, cf and key args"); }
-                    nodeCmd.printEndPoints(arguments[0], arguments[1], arguments[2], System.out);
-                    break;
-
-                case PROXYHISTOGRAMS :
-                    if (arguments.length != 0) { badUse("proxyhistograms does not take arguments"); }
-                    nodeCmd.printProxyHistograms(System.out);
-                    break;
-
-                case GETSSTABLES:
-                    if (arguments.length != 3) { badUse("getsstables requires ks, cf and key args"); }
-                    nodeCmd.printSSTables(arguments[0], arguments[1], arguments[2], System.out);
-                    break;
-
-                case REFRESH:
-                    if (arguments.length != 2) { badUse("load_new_sstables requires ks and cf args"); }
-                    probe.loadNewSSTables(arguments[0], arguments[1]);
-                    break;
-
-                case REBUILD_INDEX:
-                    if (arguments.length < 2) { badUse("rebuild_index requires ks and cf args"); }
-                    if (arguments.length >= 3)
-                        probe.rebuildIndex(arguments[0], arguments[1], arguments[2].split(","));
-                    else
-                        probe.rebuildIndex(arguments[0], arguments[1]);
-
-                    break;
-
-                case GOSSIPINFO : nodeCmd.printGossipInfo(System.out); break;
-
-                case STOP:
-                    if (arguments.length != 1) { badUse("stop requires a type."); }
-                    probe.stop(arguments[0].toUpperCase());
-                    break;
-
-                case STOPDAEMON:
-                    if (arguments.length != 0) { badUse("stopdaemon does not take arguments."); }
-                    try { probe.stopCassandraDaemon(); }
-                    catch (Throwable t) { System.out.println("Cassandra has shut down.\n"); }
-                    break;
-
-                case DESCRIBERING :
-                    if (arguments.length != 1) { badUse("Missing keyspace argument for describering."); }
-                    nodeCmd.printDescribeRing(arguments[0], System.out);
-                    break;
-
-                case RANGEKEYSAMPLE :
-                    nodeCmd.printRangeKeySample(System.out);
-                    break;
-
-                case RELOADTRIGGERS :
-                    probe.reloadTriggers();
-                    break;
-
-                default :
-                    throw new RuntimeException("Unreachable code.");
-            }
-        }
-        finally
-        {
-            if (probe != null)
-            {
-                try
-                {
-                    probe.close();
-                }
-                catch (IOException ex)
-                {
-                    // swallow the exception so the user will see the real one.
-                }
-            }
-        }
-        System.exit(probe.isFailed() ? 1 : 0);
-    }
-
-    private void printCompactionHistory(PrintStream out)
-    {
-        out.println("Compaction History: ");
-
-        TabularData tabularData = this.probe.getCompactionHistory();
-        if (tabularData.isEmpty())
-        {
-            out.printf("There is no compaction history");
-            return;
-        }
-
-        String format = "%-41s%-19s%-29s%-26s%-15s%-15s%s%n";
-        List<String> indexNames = tabularData.getTabularType().getIndexNames();
-        out.printf(format, (Object[]) indexNames.toArray(new String[indexNames.size()]));
-
-        Set<?> values = tabularData.keySet();
-        for (Object eachValue : values)
-        {
-            List<?> value = (List<?>) eachValue;
-            out.printf(format, value.toArray(new Object[value.size()]));
-        }
-    }
-
-    private static void printHistory(String[] args, ToolCommandLine cmd)
-    {
-        //don't bother to print if no args passed (meaning, nodetool is just printing out the sub-commands list)
-        if (args.length == 0)
-            return;
-        String cmdLine = Joiner.on(" ").skipNulls().join(args);
-        final String password = cmd.getOptionValue(PASSWORD_OPT.left);
-        if (password != null)
-            cmdLine = cmdLine.replace(password, "<hidden>");
-
-        try (FileWriter writer = new FileWriter(new File(FBUtilities.getToolsOutputDirectory(), HISTORYFILE), true))
-        {
-            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
-            writer.append(sdf.format(new Date()) + ": " + cmdLine + "\n");
-        }
-        catch (IOException ioe)
-        {
-            //quietly ignore any errors about not being able to write out history
-        }
-    }
-
-    private static Throwable findInnermostThrowable(Throwable ex)
-    {
-        Throwable inner = ex.getCause();
-        return inner == null ? ex : findInnermostThrowable(inner);
-    }
-
-    private void printDescribeRing(String keyspaceName, PrintStream out)
-    {
-        out.println("Schema Version:" + probe.getSchemaVersion());
-        out.println("TokenRange: ");
-        try
-        {
-            for (String tokenRangeString : probe.describeRing(keyspaceName))
-            {
-                out.println("\t" + tokenRangeString);
-            }
-        }
-        catch (IOException e)
-        {
-            err(e, e.getMessage());
-        }
-    }
-
-    private void printRangeKeySample(PrintStream outs)
-    {
-        outs.println("RangeKeySample: ");
-        List<String> tokenStrings = this.probe.sampleKeyRange();
-        for (String tokenString : tokenStrings)
-        {
-            outs.println("\t" + tokenString);
-        }
-    }
-
-    private void printGossipInfo(PrintStream out) {
-        out.println(probe.getGossipInfo());
-    }
-
-    private static void badUse(String useStr)
-    {
-        System.err.println(useStr);
-        printUsage();
-        System.exit(1);
-    }
-
-    private static void err(Exception e, String errStr)
-    {
-        System.err.println(errStr);
-        e.printStackTrace();
-        System.exit(3);
-    }
-
-    private static void complainNonzeroArgs(String[] args, NodeCommand cmd)
-    {
-        if (args.length > 0) {
-            System.err.println("Too many arguments for command '"+cmd.toString()+"'.");
-            printUsage();
-            System.exit(1);
-        }
-    }
-
-    private static void handleSnapshots(NodeCommand nc, String tag, String[] cmdArgs, String columnFamily, NodeProbe probe) throws IOException
-    {
-        String[] keyspaces = Arrays.copyOfRange(cmdArgs, 0, cmdArgs.length);
-        System.out.print("Requested " + ((nc == NodeCommand.SNAPSHOT) ? "creating" : "clearing") + " snapshot for: ");
-        if ( keyspaces.length > 0 )
-        {
-          for (int i = 0; i < keyspaces.length; i++)
-              System.out.print(keyspaces[i] + " ");
-        }
-        else
-        {
-            System.out.print("all keyspaces ");
-        }
-
-        if (columnFamily != null)
-        {
-            System.out.print("and table: " + columnFamily);
-        }
-        System.out.println();
-
-        switch (nc)
-        {
-            case SNAPSHOT :
-                if (tag == null || tag.equals(""))
-                    tag = new Long(System.currentTimeMillis()).toString();
-                probe.takeSnapshot(tag, columnFamily, keyspaces);
-                System.out.println("Snapshot directory: " + tag);
-                break;
-            case CLEARSNAPSHOT :
-                probe.clearSnapshot(tag, keyspaces);
-                break;
-        }
-    }
-
-    private static void optionalKSandCFs(NodeCommand nc, ToolCommandLine cmd, String[] cmdArgs, NodeProbe probe) throws InterruptedException, IOException
-    {
-        // if there is one additional arg, it's the keyspace; more are columnfamilies
-        List<String> keyspaces = cmdArgs.length == 0 ? probe.getKeyspaces() : Arrays.asList(cmdArgs[0]);
-        for (String keyspace : keyspaces)
-        {
-            if (!probe.getKeyspaces().contains(keyspace))
-            {
-                System.err.println("Keyspace [" + keyspace + "] does not exist.");
-                System.exit(1);
-            }
-        }
-
-        // second loop so we're less likely to die halfway through due to invalid keyspace
-        for (String keyspace : keyspaces)
-        {
-            String[] columnFamilies = cmdArgs.length <= 1 ? new String[0] : Arrays.copyOfRange(cmdArgs, 1, cmdArgs.length);
-            switch (nc)
-            {
-                case REPAIR  :
-                    boolean sequential = !cmd.hasOption(PARALLEL_REPAIR_OPT.left);
-                    boolean localDC = cmd.hasOption(LOCAL_DC_REPAIR_OPT.left);
-                    boolean specificDC = cmd.hasOption(DC_REPAIR_OPT.left);
-                    boolean primaryRange = cmd.hasOption(PRIMARY_RANGE_OPT.left);
-                    Collection<String> dataCenters = null;
-                    if (specificDC)
-                        dataCenters = Arrays.asList(cmd.getOptionValue(DC_REPAIR_OPT.left).split(","));
-                    else if (localDC)
-                        dataCenters = Arrays.asList(probe.getDataCenter());
-                    if (cmd.hasOption(START_TOKEN_OPT.left) || cmd.hasOption(END_TOKEN_OPT.left))
-                        probe.forceRepairRangeAsync(System.out, keyspace, sequential, dataCenters, cmd.getOptionValue(START_TOKEN_OPT.left), cmd.getOptionValue(END_TOKEN_OPT.left), columnFamilies);
-                    else
-                        probe.forceRepairAsync(System.out, keyspace, sequential, dataCenters, primaryRange, columnFamilies);
-                    break;
-                case FLUSH   :
-                    try { probe.forceKeyspaceFlush(keyspace, columnFamilies); }
-                    catch (ExecutionException ee) { err(ee, "Error occurred during flushing"); }
-                    break;
-                case COMPACT :
-                    try { probe.forceKeyspaceCompaction(keyspace, columnFamilies); }
-                    catch (ExecutionException ee) { err(ee, "Error occurred during compaction"); }
-                    break;
-                case CLEANUP :
-                    if (keyspace.equals(Keyspace.SYSTEM_KS)) { break; } // Skip cleanup on system cfs.
-                    try { probe.forceKeyspaceCleanup(keyspace, columnFamilies); }
-                    catch (ExecutionException ee) { err(ee, "Error occurred during cleanup"); }
-                    break;
-                case SCRUB :
-                    boolean disableSnapshot = cmd.hasOption(NO_SNAPSHOT.left);
-                    try { probe.scrub(disableSnapshot, keyspace, columnFamilies); }
-                    catch (ExecutionException ee) { err(ee, "Error occurred while scrubbing keyspace " + keyspace); }
-                    break;
-                case UPGRADESSTABLES :
-                    boolean excludeCurrentVersion = !cmd.hasOption(UPGRADE_ALL_SSTABLE_OPT.left);
-                    try { probe.upgradeSSTables(keyspace, excludeCurrentVersion, columnFamilies); }
-                    catch (ExecutionException ee) { err(ee, "Error occurred while upgrading the sstables for keyspace " + keyspace); }
-                    break;
-                case ENABLEAUTOCOMPACTION:
-                    probe.enableAutoCompaction(keyspace, columnFamilies);
-                    break;
-                case DISABLEAUTOCOMPACTION:
-                    probe.disableAutoCompaction(keyspace, columnFamilies);
-                    break;
-                default:
-                    throw new RuntimeException("Unreachable code.");
-            }
-        }
-    }
-
-    /**
-     * Used for filtering keyspaces and columnfamilies to be displayed using the cfstats command.
-     */
-    private static class OptionFilter
-    {
-        private Map<String, List<String>> filter = new HashMap<String, List<String>>();
-        private Map<String, List<String>> verifier = new HashMap<String, List<String>>();
-        private String [] filterList;
-        private boolean ignoreMode;
-
-        public OptionFilter(boolean ignoreMode, String... filterList)
-        {
-            this.filterList = filterList;
-            this.ignoreMode = ignoreMode;
-
-            if(filterList == null)
-                return;
-
-            for(String s : filterList)
-            {
-                String [] keyValues = s.split("\\.", 2);
-
-                // build the map that stores the ks' and cfs to use
-                if(!filter.containsKey(keyValues[0]))
-                {
-                    filter.put(keyValues[0], new ArrayList<String>());
-                    verifier.put(keyValues[0], new ArrayList<String>());
-
-                    if(keyValues.length == 2)
-                    {
-                        filter.get(keyValues[0]).add(keyValues[1]);
-                        verifier.get(keyValues[0]).add(keyValues[1]);
-                    }
-                }
-                else
-                {
-                    if(keyValues.length == 2)
-                    {
-                        filter.get(keyValues[0]).add(keyValues[1]);
-                        verifier.get(keyValues[0]).add(keyValues[1]);
-                    }
-                }
-            }
-        }
-
-        public boolean isColumnFamilyIncluded(String keyspace, String columnFamily)
-        {
-            // supplying empty params list is treated as wanting to display all kss & cfs
-            if(filterList == null)
-                return !ignoreMode;
-
-            List<String> cfs = filter.get(keyspace);
-
-            // no such keyspace is in the map
-            if (cfs == null)
-                return ignoreMode;
-                // only a keyspace with no cfs was supplied
-                // so ignore or include (based on the flag) every column family in specified keyspace
-            else if (cfs.size() == 0)
-                return !ignoreMode;
-
-            // keyspace exists, and it contains specific cfs
-            verifier.get(keyspace).remove(columnFamily);
-            return ignoreMode ^ cfs.contains(columnFamily);
-        }
-
-        public void verifyKeyspaces(List<String> keyspaces)
-        {
-            for(String ks : verifier.keySet())
-                if(!keyspaces.contains(ks))
-                    throw new RuntimeException("Unknown keyspace: " + ks);
-        }
-
-        public void verifyColumnFamilies()
-        {
-            for(String ks : filter.keySet())
-                if(verifier.get(ks).size() > 0)
-                    throw new RuntimeException("Unknown column families: " + verifier.get(ks).toString() + " in keyspace: " + ks);
-        }
-    }
-
-    private static class ToolOptions extends Options
-    {
-        public void addOption(Pair<String, String> opts, boolean hasArgument, String description)
-        {
-            addOption(opts, hasArgument, description, false);
-        }
-
-        public void addOption(Pair<String, String> opts, boolean hasArgument, String description, boolean required)
-        {
-            addOption(opts.left, opts.right, hasArgument, description, required);
-        }
-
-        public void addOption(String opt, String longOpt, boolean hasArgument, String description, boolean required)
-        {
-            Option option = new Option(opt, longOpt, hasArgument, description);
-            option.setRequired(required);
-            addOption(option);
-        }
-    }
-
-    private static class ToolCommandLine
-    {
-        private final CommandLine commandLine;
-
-        public ToolCommandLine(CommandLine commands)
-        {
-            commandLine = commands;
-        }
-
-        public Option[] getOptions()
-        {
-            return commandLine.getOptions();
-        }
-
-        public boolean hasOption(String opt)
-        {
-            return commandLine.hasOption(opt);
-        }
-
-        public String getOptionValue(String opt)
-        {
-            return commandLine.getOptionValue(opt);
-        }
-
-        public NodeCommand getCommand()
-        {
-            if (commandLine.getArgs().length == 0)
-                throw new IllegalArgumentException("Command was not specified.");
-
-            String command = commandLine.getArgs()[0];
-
-            try
-            {
-                return NodeCommand.valueOf(command.toUpperCase());
-            }
-            catch (IllegalArgumentException e)
-            {
-                throw new IllegalArgumentException("Unrecognized command: " + command);
-            }
-        }
-
-        public String[] getCommandArguments()
-        {
-            List params = commandLine.getArgList();
-
-            if (params.size() < 2) // command parameters are empty
-                return new String[0];
-
-            String[] toReturn = new String[params.size() - 1];
-
-            for (int i = 1; i < params.size(); i++)
-            {
-                String parm = (String) params.get(i);
-                // why? look at CASSANDRA-4808
-                if (parm.startsWith("\\"))
-                    parm = parm.substring(1);
-                toReturn[i - 1] = parm;
-            }
-            return toReturn;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/892d8e69/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index b19a4bc..7727401 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -64,7 +64,7 @@ import org.apache.cassandra.utils.SimpleCondition;
 /**
  * JMX client operations for Cassandra.
  */
-public class NodeProbe
+public class NodeProbe implements AutoCloseable
 {
     private static final String fmtUrl = "service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi";
     private static final String ssObjName = "org.apache.cassandra.db:type=StorageService";