You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2013/06/20 20:18:17 UTC

git commit: Add standalone sstableupgrade utility. Patch by Nick Bailey, reviewed by brandonwilliams for CASSANDRA-5524

Updated Branches:
  refs/heads/cassandra-1.2 41f418a09 -> 3814af808


Add standalone sstableupgrade utility.
Patch by Nick Bailey, reviewed by brandonwilliams for CASSANDRA-5524


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3814af80
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3814af80
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3814af80

Branch: refs/heads/cassandra-1.2
Commit: 3814af8087c8b5541bea563344afcc344f5efa2a
Parents: 41f418a
Author: Brandon Williams <br...@apache.org>
Authored: Thu Jun 20 13:15:54 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Thu Jun 20 13:15:54 2013 -0500

----------------------------------------------------------------------
 NEWS.txt                                        |  67 +++---
 bin/sstableupgrade                              |  55 +++++
 debian/cassandra.install                        |   1 +
 .../cassandra/db/compaction/Upgrader.java       | 167 ++++++++++++++
 .../cassandra/tools/StandaloneUpgrader.java     | 223 +++++++++++++++++++
 5 files changed, 482 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3814af80/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 5cb06da..dbc9aab 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -8,6 +8,11 @@ upgrade, just in case you need to roll back to the previous version.
 (Cassandra version X + 1 will always be able to read data files created
 by version X, but the inverse is not necessarily the case.)
 
+When upgrading major versions of Cassandra, you will be unable to
+restore snapshots created with the previous major version using the
+'sstableloader' tool. You can upgrade the file format of your snapshots
+using the provided 'sstableupgrade' tool.
+
 1.2.6
 =====
 
@@ -217,7 +222,7 @@ Features
     - num_tokens can now be specified in cassandra.yaml. This defines the
       number of tokens assigned to the host on the ring (default: 1).
       Also specifying initial_token will override any num_tokens setting.
-    - disk_failure_policy allows blacklisting failed disks in JBOD 
+    - disk_failure_policy allows blacklisting failed disks in JBOD
       configuration instead of erroring out indefinitely
     - event tracing can be configured per-connection ("trace_next_query")
       or globally/probabilistically ("nodetool settraceprobability")
@@ -314,7 +319,7 @@ Upgrading
       throw an InvalidRequestException when used for reads.  (Previous
       versions would silently perform a ONE read for range queries;
       single-row and multiget reads already rejected ANY.)
-    - The largest mutation batch accepted by the commitlog is now 128MB.  
+    - The largest mutation batch accepted by the commitlog is now 128MB.
       (In practice, batches larger than ~10MB always caused poor
       performance due to load volatility and GC promotion failures.)
       Larger batches will continue to be accepted but will not be
@@ -514,7 +519,7 @@ Upgrading
     - Upgrading from version 0.7.1+ or 0.8.2+ can be done with a rolling
       restart, one node at a time.  (0.8.0 or 0.8.1 are NOT network-compatible
       with 1.0: upgrade to the most recent 0.8 release first.)
-      You do not need to bring down the whole cluster at once. 
+      You do not need to bring down the whole cluster at once.
     - After upgrading, run nodetool scrub against each node before running
       repair, moving nodes, or adding new ones.
     - CQL inserts/updates now generate microsecond resolution timestamps
@@ -695,7 +700,7 @@ Upgrading
 ---------
     - Upgrading from version 0.7.1 or later can be done with a rolling
       restart, one node at a time.  You do not need to bring down the
-      whole cluster at once. 
+      whole cluster at once.
     - After upgrading, run nodetool scrub against each node before running
       repair, moving nodes, or adding new ones.
     - Running nodetool drain before shutting down the 0.7 node is
@@ -706,8 +711,8 @@ Upgrading
       to use your 0.7 clients.
     - Avro record classes used in map/reduce and Hadoop streaming code have
       been removed. Map/reduce can be switched to Thrift by changing
-      org.apache.cassandra.avro in import statements to 
-      org.apache.cassandra.thrift (no class names change). Streaming support 
+      org.apache.cassandra.avro in import statements to
+      org.apache.cassandra.thrift (no class names change). Streaming support
       has been removed for the time being.
     - The loadbalance command has been removed from nodetool.  For similar
       behavior, decommission then rebootstrap with empty initial_token.
@@ -721,15 +726,15 @@ Features
 --------
     - added CQL client API and JDBC/DBAPI2-compliant drivers for Java and
       Python, respectively (see: drivers/ subdirectory and doc/cql)
-    - added distributed Counters feature; 
+    - added distributed Counters feature;
       see http://wiki.apache.org/cassandra/Counters
     - optional intranode encryption; see comments around 'encryption_options'
       in cassandra.yaml
-    - compaction multithreading and rate-limiting; see 
+    - compaction multithreading and rate-limiting; see
       'concurrent_compactors' and 'compaction_throughput_mb_per_sec' in
       cassandra.yaml
     - cassandra will limit total memtable memory usage to 1/3 of the heap
-      by default.  This can be ajusted or disabled with the 
+      by default.  This can be ajusted or disabled with the
       memtable_total_space_in_mb option.  The old per-ColumnFamily
       throughput, operations, and age settings are still respected but
       will be removed in a future major release once we are satisfied that
@@ -738,7 +743,7 @@ Features
 Tools
 -----
     - stress and py_stress moved from contrib/ to tools/
-    - clustertool was removed (see 
+    - clustertool was removed (see
       https://issues.apache.org/jira/browse/CASSANDRA-2607 for examples
       of how to script nodetool across the cluster instead)
 
@@ -814,7 +819,7 @@ Upgrading
     - 0.7.1 and 0.7.2 shipped with a bug that caused incorrect row-level
       bloom filters to be generated when compacting sstables generated
       with earlier versions.  This would manifest in IOExceptions during
-      column name-based queries.  0.7.3 provides "nodetool scrub" to 
+      column name-based queries.  0.7.3 provides "nodetool scrub" to
       rebuild sstables with correct bloom filters, with no data lost.
       (If your cluster was never on 0.7.0 or earlier, you don't have to
       worry about this.)  Note that nodetool scrub will snapshot your
@@ -862,10 +867,10 @@ Features
     - Row size limit increased from 2GB to 2 billion columns.  rows
       are no longer read into memory during compaction.
     - Keyspace and ColumnFamily definitions may be added and modified live
-    - Streaming data for repair or node movement no longer requires 
+    - Streaming data for repair or node movement no longer requires
       anticompaction step first
-    - NetworkTopologyStrategy (formerly DatacenterShardStrategy) is ready for 
-      use, enabling ConsistencyLevel.DCQUORUM and DCQUORUMSYNC.  See comments 
+    - NetworkTopologyStrategy (formerly DatacenterShardStrategy) is ready for
+      use, enabling ConsistencyLevel.DCQUORUM and DCQUORUMSYNC.  See comments
       in `cassandra.yaml.`
     - Optional per-Column time-to-live field allows expiring data without
       have to issue explicit remove commands
@@ -879,9 +884,9 @@ Features
     - Optional round-robin scheduling between keyspaces for multitenant
       clusters
     - Dynamic endpoint snitch mitigates the impact of impaired nodes
-    - New `IntegerType`, faster than LongType and allows integers of 
+    - New `IntegerType`, faster than LongType and allows integers of
       both less and more bits than Long's 64
-    - A revamped authentication system that decouples authorization and 
+    - A revamped authentication system that decouples authorization and
       allows finer-grained control of resources.
 
 Upgrading
@@ -893,9 +898,9 @@ Upgrading
     The Cassandra inter-node protocol is incompatible with 0.6.x
     releases (and with 0.7 beta1), meaning you will have to bring your
     cluster down prior to upgrading: you cannot mix 0.6 and 0.7 nodes.
-    
+
     The hints schema was changed from 0.6 to 0.7. Cassandra automatically
-    snapshots and then truncates the hints column family as part of 
+    snapshots and then truncates the hints column family as part of
     starting up 0.7 for the first time.
 
     Keyspace and ColumnFamily definitions are stored in the system
@@ -904,13 +909,13 @@ Upgrading
     The process to upgrade is:
     1) run "nodetool drain" on _each_ 0.6 node.  When drain finishes (log
        message "Node is drained" appears), stop the process.
-    2) Convert your storage-conf.xml to the new cassandra.yaml using 
-       "bin/config-converter".  
+    2) Convert your storage-conf.xml to the new cassandra.yaml using
+       "bin/config-converter".
     3) Rename any of your keyspace or column family names that do not adhere
        to the '^\w+' regex convention.
     4) Start up your cluster with the 0.7 version.
-    5) Initialize your Keyspace and ColumnFamily definitions using 
-       "bin/schematool <host> <jmxport> import".  _You only need to do 
+    5) Initialize your Keyspace and ColumnFamily definitions using
+       "bin/schematool <host> <jmxport> import".  _You only need to do
        this to one node_.
 
 Thrift API
@@ -935,7 +940,7 @@ Configuraton
 ------------
     - Configuration file renamed to cassandra.yaml and log4j.properties to
       log4j-server.properties
-    - PropertyFileSnitch configuration file renamed to 
+    - PropertyFileSnitch configuration file renamed to
       cassandra-topology.properties
     - The ThriftAddress and ThriftPort directives have been renamed to
       RPCAddress and RPCPort respectively.
@@ -952,7 +957,7 @@ Configuraton
       one node_.
     - In addition to an authenticator, an authority must be configured as
       well. Users of SimpleAuthenticator should use SimpleAuthority for this
-      value (the default is AllowAllAuthority, which corresponds with 
+      value (the default is AllowAllAuthority, which corresponds with
       AllowAllAuthenticator).
     - The format of access.properties has changed, see the sample configuration
       conf/access.properties for documentation on the new format.
@@ -1011,7 +1016,7 @@ Features
 Configuraton
 ------------
     - MemtableSizeInMB has been replaced by MemtableThroughputInMB which
-      triggers a memtable flush when the specified amount of data has 
+      triggers a memtable flush when the specified amount of data has
       been written, including overwrites.
     - MemtableObjectCountInMillions has been replaced by the
       MemtableOperationsInMillions directive which causes a memtable flush
@@ -1047,7 +1052,7 @@ JMX metrics
       progress of the current compaction has been added.
     - commitlog JMX metrics are moved to org.apache.cassandra.db.Commitlog
     - progress of data streaming during bootstrap, loadbalance, or other
-      data migration, is available under 
+      data migration, is available under
       org.apache.cassandra.streaming.StreamingService.
       See http://wiki.apache.org/cassandra/Streaming for details.
 
@@ -1061,8 +1066,8 @@ Installation/Upgrade
 0.5.0
 =====
 
-0. The commitlog format has changed (but sstable format has not). 
-   When upgrading from 0.4, empty the commitlog either by running 
+0. The commitlog format has changed (but sstable format has not).
+   When upgrading from 0.4, empty the commitlog either by running
    bin/nodeprobe flush on each machine and waiting for the flush to finish,
    or simply remove the commitlog directory if you only have test data.
    (If more writes come in after the flush command, starting 0.5 will error
@@ -1083,7 +1088,7 @@ Installation/Upgrade
 
 3. Configuration:
      - Added "comment" field to ColumnFamily definition.
-     - Added MemtableFlushAfterMinutes, a global replacement for the 
+     - Added MemtableFlushAfterMinutes, a global replacement for the
        old per-CF FlushPeriodInMinutes setting
      - Key cache settings
 
@@ -1121,7 +1126,7 @@ Installation/Upgrade
    create and modify ColumnFamilies at will without worrying about
    collisions with others in the same cluster.
 
-3. Many Thrift API changes and documentation.  See 
+3. Many Thrift API changes and documentation.  See
    http://wiki.apache.org/cassandra/API
 
 4. Removed the web interface in favor of JMX and bin/nodeprobe, which
@@ -1166,4 +1171,4 @@ key in a given ColumnFamily) is limited by available memory, because
 compaction deserializes each row before merging.
 
 See https://issues.apache.org/jira/browse/CASSANDRA-16
-   
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3814af80/bin/sstableupgrade
----------------------------------------------------------------------
diff --git a/bin/sstableupgrade b/bin/sstableupgrade
new file mode 100755
index 0000000..b5ddd6a
--- /dev/null
+++ b/bin/sstableupgrade
@@ -0,0 +1,55 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ "x$CASSANDRA_INCLUDE" = "x" ]; then
+    for include in /usr/share/cassandra/cassandra.in.sh \
+                   /usr/local/share/cassandra/cassandra.in.sh \
+                   /opt/cassandra/cassandra.in.sh \
+                   ~/.cassandra.in.sh \
+                   `dirname $0`/cassandra.in.sh; do
+        if [ -r $include ]; then
+            . $include
+            break
+        fi
+    done
+elif [ -r $CASSANDRA_INCLUDE ]; then
+    . $CASSANDRA_INCLUDE
+fi
+
+# Use JAVA_HOME if set, otherwise look for java in PATH
+if [ -x $JAVA_HOME/bin/java ]; then
+    JAVA=$JAVA_HOME/bin/java
+else
+    JAVA=`which java`
+fi
+
+if [ -z $CLASSPATH ]; then
+    echo "You must set the CLASSPATH var" >&2
+    exit 1
+fi
+
+if [ "x$MAX_HEAP_SIZE" = "x" ]; then
+    MAX_HEAP_SIZE="256M"
+fi
+
+$JAVA -ea -cp $CLASSPATH -Xmx$MAX_HEAP_SIZE \
+        -Dlog4j.configuration=log4j-tools.properties \
+        org.apache.cassandra.tools.StandaloneUpgrader "$@"
+
+# vi:ai sw=4 ts=4 tw=0 et
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3814af80/debian/cassandra.install
----------------------------------------------------------------------
diff --git a/debian/cassandra.install b/debian/cassandra.install
index 6d7ba8f..a504b78 100644
--- a/debian/cassandra.install
+++ b/debian/cassandra.install
@@ -17,6 +17,7 @@ bin/sstablekeys usr/bin
 bin/sstableloader usr/bin
 bin/cqlsh usr/bin
 bin/sstablescrub usr/bin
+bin/sstableupgrade usr/bin
 bin/cassandra-shuffle usr/bin
 tools/bin/cassandra-stress usr/bin
 tools/bin/token-generator usr/bin

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3814af80/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
new file mode 100644
index 0000000..e7211ba
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.IOError;
+import java.util.*;
+
+import com.google.common.base.Throwables;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
+import org.apache.cassandra.db.compaction.AbstractCompactionIterable;
+import org.apache.cassandra.db.compaction.CompactionIterable;
+import org.apache.cassandra.db.compaction.CompactionController;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.CompactionTask;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.OutputHandler;
+
+public class Upgrader
+{
+    private final ColumnFamilyStore cfs;
+    private final SSTableReader sstable;
+    private final Collection<SSTableReader> toUpgrade;
+    private final File directory;
+
+    private final OperationType compactionType = OperationType.UPGRADE_SSTABLES;
+    private final CompactionController controller;
+    private final AbstractCompactionStrategy strategy;
+    private final long estimatedRows;
+
+    private final int gcBefore = CompactionManager.NO_GC;
+
+    private final OutputHandler outputHandler;
+
+    public Upgrader(ColumnFamilyStore cfs, SSTableReader sstable, OutputHandler outputHandler)
+    {
+        this.cfs = cfs;
+        this.sstable = sstable;
+        this.toUpgrade = Collections.singletonList(sstable);
+        this.outputHandler = outputHandler;
+
+        this.directory = new File(sstable.getFilename()).getParentFile();
+
+        this.controller = new UpgradeController(cfs);
+
+        this.strategy = cfs.getCompactionStrategy();
+        long estimatedTotalKeys = Math.max(DatabaseDescriptor.getIndexInterval(), SSTableReader.getApproximateKeyCount(toUpgrade));
+        long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(this.toUpgrade) / strategy.getMaxSSTableSize());
+        this.estimatedRows = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
+    }
+
+    private SSTableWriter createCompactionWriter()
+    {
+        SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector();
+
+        // Get the max timestamp of the precompacted sstables
+        // and adds generation of live ancestors
+        for (SSTableReader sstable : toUpgrade)
+        {
+            sstableMetadataCollector.addAncestor(sstable.descriptor.generation);
+            for (Integer i : sstable.getAncestors())
+            {
+                if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists())
+                    sstableMetadataCollector.addAncestor(i);
+            }
+        }
+
+        return new SSTableWriter(cfs.getTempSSTablePath(directory), estimatedRows, cfs.metadata, cfs.partitioner, sstableMetadataCollector);
+    }
+
+    public void upgrade()
+    {
+        outputHandler.output("Upgrading " + sstable);
+
+
+        AbstractCompactionIterable ci = new CompactionIterable(compactionType, strategy.getScanners(this.toUpgrade), controller);
+
+        CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
+
+        Collection<SSTableReader> sstables = new ArrayList<SSTableReader>();
+        Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>();
+
+        try
+        {
+            SSTableWriter writer = createCompactionWriter();
+            writers.add(writer);
+            while (iter.hasNext())
+            {
+                AbstractCompactedRow row = iter.next();
+
+                RowIndexEntry indexEntry = writer.append(row);
+            }
+
+            long maxAge = CompactionTask.getMaxDataAge(this.toUpgrade);
+            for (SSTableWriter completedWriter : writers)
+                sstables.add(completedWriter.closeAndOpenReader(maxAge));
+
+            outputHandler.output("Upgrade of " + sstable + " complete.");
+
+        }
+        catch (Throwable t)
+        {
+            for (SSTableWriter writer : writers)
+                writer.abort();
+            // also remove already completed SSTables
+            for (SSTableReader sstable : sstables)
+            {
+                sstable.markCompacted();
+                sstable.releaseReference();
+            }
+            throw Throwables.propagate(t);
+        }
+        finally
+        {
+            controller.close();
+
+            try
+            {
+                iter.close();
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private static class UpgradeController extends CompactionController
+    {
+        public UpgradeController(ColumnFamilyStore cfs)
+        {
+            super(cfs, Integer.MAX_VALUE);
+        }
+
+        @Override
+        public boolean shouldPurge(DecoratedKey key, long delTimestamp)
+        {
+            return false;
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3814af80/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
new file mode 100644
index 0000000..357e99c
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.tools;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+import com.google.common.base.Throwables;
+
+import org.apache.commons.cli.*;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.compaction.Upgrader;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.service.CassandraDaemon;
+import org.apache.cassandra.utils.OutputHandler;
+
+import static org.apache.cassandra.tools.BulkLoader.CmdLineOptions;
+
+public class StandaloneUpgrader
+{
+    static
+    {
+        CassandraDaemon.initLog4j();
+    }
+
+    private static final String TOOL_NAME = "sstableupgrade";
+    private static final String DEBUG_OPTION  = "debug";
+    private static final String HELP_OPTION  = "help";
+
+    public static void main(String args[]) throws IOException
+    {
+        Options options = Options.parseArgs(args);
+        try
+        {
+            // load keyspace descriptions.
+            DatabaseDescriptor.loadSchemas();
+
+            if (Schema.instance.getCFMetaData(options.keyspace, options.cf) == null)
+                throw new IllegalArgumentException(String.format("Unknown keyspace/columnFamily %s.%s",
+                                                                 options.keyspace,
+                                                                 options.cf));
+
+            Table table = Table.openWithoutSSTables(options.keyspace);
+            ColumnFamilyStore cfs = table.getColumnFamilyStore(options.cf);
+
+            OutputHandler handler = new OutputHandler.SystemOutput(false, options.debug);
+            Directories.SSTableLister lister = cfs.directories.sstableLister();
+            if (options.snapshot != null)
+                lister.onlyBackups(true).snapshots(options.snapshot);
+            else
+                lister.includeBackups(false);
+
+            Collection<SSTableReader> readers = new ArrayList<SSTableReader>();
+
+            // Upgrade sstables
+            for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
+            {
+                Set<Component> components = entry.getValue();
+                if (!components.contains(Component.DATA) || !components.contains(Component.PRIMARY_INDEX))
+                    continue;
+
+                try
+                {
+                    SSTableReader sstable = SSTableReader.openNoValidation(entry.getKey(), components, cfs.metadata);
+                    if (sstable.descriptor.version.equals(Descriptor.Version.CURRENT))
+                        continue;
+                    readers.add(sstable);
+                }
+                catch (Exception e)
+                {
+                    System.err.println(String.format("Error Loading %s: %s", entry.getKey(), e.getMessage()));
+                    if (options.debug)
+                        e.printStackTrace(System.err);
+
+                    continue;
+                }
+            }
+
+            int numSSTables = readers.size();
+            handler.output("Found " + numSSTables + " sstables that need upgrading.");
+
+            for (SSTableReader sstable : readers)
+            {
+                try
+                {
+                    Upgrader upgrader = new Upgrader(cfs, sstable, handler);
+                    upgrader.upgrade();
+
+                    sstable.markCompacted();
+                    sstable.releaseReference();
+                }
+                catch (Exception e)
+                {
+                    System.err.println(String.format("Error upgrading %s: %s", sstable, e.getMessage()));
+                    if (options.debug)
+                        e.printStackTrace(System.err);
+                }
+            }
+
+            SSTableDeletingTask.waitForDeletions();
+            System.exit(0);
+        }
+        catch (Exception e)
+        {
+            System.err.println(e.getMessage());
+            if (options.debug)
+                e.printStackTrace(System.err);
+            System.exit(1);
+        }
+    }
+
+    private static class Options
+    {
+        public final String keyspace;
+        public final String cf;
+        public final String snapshot;
+
+        public boolean debug;
+
+        private Options(String keyspace, String cf, String snapshot)
+        {
+            this.keyspace = keyspace;
+            this.cf = cf;
+            this.snapshot = snapshot;
+        }
+
+        public static Options parseArgs(String cmdArgs[])
+        {
+            CommandLineParser parser = new GnuParser();
+            CmdLineOptions options = getCmdLineOptions();
+            try
+            {
+                CommandLine cmd = parser.parse(options, cmdArgs, false);
+
+                if (cmd.hasOption(HELP_OPTION))
+                {
+                    printUsage(options);
+                    System.exit(0);
+                }
+
+                String[] args = cmd.getArgs();
+                if (args.length >= 4 || args.length < 2)
+                {
+                    String msg = args.length < 2 ? "Missing arguments" : "Too many arguments";
+                    errorMsg(msg, options);
+                    System.exit(1);
+                }
+
+                String keyspace = args[0];
+                String cf = args[1];
+                String snapshot = null;
+                if (args.length == 3)
+                    snapshot = args[2];
+
+                Options opts = new Options(keyspace, cf, snapshot);
+
+                opts.debug = cmd.hasOption(DEBUG_OPTION);
+
+                return opts;
+            }
+            catch (ParseException e)
+            {
+                errorMsg(e.getMessage(), options);
+                return null;
+            }
+        }
+
+        private static void errorMsg(String msg, CmdLineOptions options)
+        {
+            System.err.println(msg);
+            printUsage(options);
+            System.exit(1);
+        }
+
+        private static CmdLineOptions getCmdLineOptions()
+        {
+            CmdLineOptions options = new CmdLineOptions();
+            options.addOption(null, DEBUG_OPTION,          "display stack traces");
+            options.addOption("h",  HELP_OPTION,           "display this help message");
+            return options;
+        }
+
+        public static void printUsage(CmdLineOptions options)
+        {
+            String usage = String.format("%s [options] <keyspace> <cf> [snapshot]", TOOL_NAME);
+            StringBuilder header = new StringBuilder();
+            header.append("--\n");
+            header.append("Upgrade the sstables in the given cf (or snapshot) to the current version of Cassandra." );
+            header.append("This operation will rewrite the sstables in the specified cf to match the " );
+            header.append("currently installed version of Cassandra.\n");
+            header.append("The snapshot option will only upgrade the specified snapshot. Upgrading " );
+            header.append("snapshots is required before attempting to restore a snapshot taken in a " );
+            header.append("major version older than the major version Cassandra is currently running. " );
+            header.append("This will replace the files in the given snapshot as well as break any " );
+            header.append("hard links to live sstables." );
+            header.append("\n--\n");
+            header.append("Options are:");
+            new HelpFormatter().printHelp(usage, header.toString(), options, "");
+        }
+    }
+}
+