You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/08/22 10:18:31 UTC

[1/2] git commit: Add SSTableSplitter tool to split sstables offline

Updated Branches:
  refs/heads/cassandra-2.0.0 5774ecb40 -> 624a7a02f


Add SSTableSplitter tool to split sstables offline

patch by slebresne; reviewed by krummas for CASSANDRA-4766


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/39066b72
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/39066b72
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/39066b72

Branch: refs/heads/cassandra-2.0.0
Commit: 39066b722607fa88e75d5bc772bd52f1ec8914a0
Parents: 9bb4d93
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Sun Nov 11 14:54:43 2012 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Aug 22 10:08:10 2013 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   3 +-
 bin/sstablesplit                                |  50 ++++
 debian/cassandra.install                        |   1 +
 .../cassandra/db/compaction/CompactionTask.java |  14 +-
 .../db/compaction/SSTableSplitter.java          | 105 ++++++++
 .../cassandra/tools/StandaloneSplitter.java     | 256 +++++++++++++++++++
 7 files changed, 427 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/39066b72/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e1c963c..e887c27 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -26,6 +26,7 @@
  * Fix to support off heap bloom filters size greater than 2 GB (CASSANDRA-5903)
  * Properly handle parsing huge map and set literals (CASSANDRA-5893)
  * Fix LCS L0 compaction may overlap in L1 (CASSANDRA-5907)
+ * New sstablesplit tool to split large sstables offline (CASSANDRA-4766)
 Merged from 1.1:
  * Correctly validate sparse composite cells in scrub (CASSANDRA-5855)
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39066b72/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 11281ee..491a438 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -18,9 +18,10 @@ using the provided 'sstableupgrade' tool.
 
 Features
 --------
-    - A history of executed nodetool commands is now captured. 
+    - A history of executed nodetool commands is now captured.
       It can be found in ~/.cassandra/nodetool.history. Other tools output files
       (cli and cqlsh history, .cqlshrc) are now centralized in ~/.cassandra, as well.
+    - A new sstablesplit utility allows to split large sstables offline.
 
 Defaults
 --------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39066b72/bin/sstablesplit
----------------------------------------------------------------------
diff --git a/bin/sstablesplit b/bin/sstablesplit
new file mode 100755
index 0000000..933a67d
--- /dev/null
+++ b/bin/sstablesplit
@@ -0,0 +1,50 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ "x$CASSANDRA_INCLUDE" = "x" ]; then
+    for include in /usr/share/cassandra/cassandra.in.sh \
+                   /usr/local/share/cassandra/cassandra.in.sh \
+                   /opt/cassandra/cassandra.in.sh \
+                   ~/.cassandra.in.sh \
+                   `dirname $0`/cassandra.in.sh; do
+        if [ -r $include ]; then
+            . $include
+            break
+        fi
+    done
+elif [ -r $CASSANDRA_INCLUDE ]; then
+    . $CASSANDRA_INCLUDE
+fi
+
+# Use JAVA_HOME if set, otherwise look for java in PATH
+if [ -x $JAVA_HOME/bin/java ]; then
+    JAVA=$JAVA_HOME/bin/java
+else
+    JAVA=`which java`
+fi
+
+if [ -z $CLASSPATH ]; then
+    echo "You must set the CLASSPATH var" >&2
+    exit 1
+fi
+
+$JAVA -ea -cp $CLASSPATH -Xmx256M \
+        -Dlog4j.configuration=log4j-tools.properties \
+        org.apache.cassandra.tools.StandaloneSplitter "$@"
+
+# vi:ai sw=4 ts=4 tw=0 et

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39066b72/debian/cassandra.install
----------------------------------------------------------------------
diff --git a/debian/cassandra.install b/debian/cassandra.install
index a504b78..70d4b97 100644
--- a/debian/cassandra.install
+++ b/debian/cassandra.install
@@ -18,6 +18,7 @@ bin/sstableloader usr/bin
 bin/cqlsh usr/bin
 bin/sstablescrub usr/bin
 bin/sstableupgrade usr/bin
+bin/sstablesplit usr/bin
 bin/cassandra-shuffle usr/bin
 tools/bin/cassandra-stress usr/bin
 tools/bin/token-generator usr/bin

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39066b72/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index a6b9f89..0fed0a2 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -98,7 +98,7 @@ public class CompactionTask extends AbstractCompactionTask
         for (SSTableReader sstable : toCompact)
             assert sstable.descriptor.cfname.equals(cfs.columnFamily);
 
-        CompactionController controller = new CompactionController(cfs, toCompact, gcBefore);
+        CompactionController controller = getCompactionController(toCompact);
         // new sstables from flush can be added during a compaction, but only the compaction can remove them,
         // so in our single-threaded compaction world this is a valid way of determining if we're compacting
         // all the sstables (that existed when we started)
@@ -227,7 +227,7 @@ public class CompactionTask extends AbstractCompactionTask
                 collector.finishCompaction(ci);
         }
 
-        cfs.replaceCompactedSSTables(toCompact, sstables, compactionType);
+        replaceCompactedSSTables(toCompact, sstables);
         // TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up
         for (SSTableReader sstable : sstables)
         {
@@ -265,6 +265,16 @@ public class CompactionTask extends AbstractCompactionTask
         }
     }
 
+    protected void replaceCompactedSSTables(Collection<SSTableReader> compacted, Collection<SSTableReader> replacements)
+    {
+        cfs.replaceCompactedSSTables(compacted, replacements, compactionType);
+    }
+
+    protected CompactionController getCompactionController(Collection<SSTableReader> toCompact)
+    {
+        return new CompactionController(cfs, toCompact, gcBefore);
+    }
+
     protected boolean partialCompactionsAcceptable()
     {
         return !isUserDefined;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39066b72/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
new file mode 100644
index 0000000..214c7a1
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.io.sstable.*;
+
+public class SSTableSplitter {
+
+    private final SplittingCompactionTask task;
+
+    private CompactionInfo.Holder info;
+
+    public SSTableSplitter(ColumnFamilyStore cfs, SSTableReader sstable, int sstableSizeInMB)
+    {
+        this.task = new SplittingCompactionTask(cfs, sstable, sstableSizeInMB);
+    }
+
+    public void split() throws IOException
+    {
+        task.execute(new StatsCollector());
+    }
+
+    public class StatsCollector implements CompactionManager.CompactionExecutorStatsCollector
+    {
+        public void beginCompaction(CompactionInfo.Holder ci)
+        {
+            SSTableSplitter.this.info = ci;
+        }
+
+        public void finishCompaction(CompactionInfo.Holder ci)
+        {
+            // no-op
+        }
+    }
+
+    public static class SplittingCompactionTask extends CompactionTask
+    {
+        private final int sstableSizeInMB;
+
+        public SplittingCompactionTask(ColumnFamilyStore cfs, SSTableReader sstable, int sstableSizeInMB)
+        {
+            super(cfs, Collections.singletonList(sstable), CompactionManager.NO_GC);
+            this.sstableSizeInMB = sstableSizeInMB;
+
+            if (sstableSizeInMB <= 0)
+                throw new IllegalArgumentException("Invalid target size for SSTables, must be > 0 (got: " + sstableSizeInMB + ")");
+        }
+
+        @Override
+        protected CompactionController getCompactionController(Collection<SSTableReader> toCompact)
+        {
+            return new SplitController(cfs, toCompact);
+        }
+
+        @Override
+        protected void replaceCompactedSSTables(Collection<SSTableReader> compacted, Collection<SSTableReader> replacements)
+        {
+        }
+
+        @Override
+        protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer)
+        {
+            return writer.getOnDiskFilePointer() > sstableSizeInMB * 1024L * 1024L;
+        }
+
+        @Override
+        protected boolean partialCompactionsAcceptable()
+        {
+            return true;
+        }
+    }
+
+    public static class SplitController extends CompactionController
+    {
+        public SplitController(ColumnFamilyStore cfs, Collection<SSTableReader> toCompact)
+        {
+            super(cfs, CompactionManager.NO_GC);
+        }
+
+        @Override
+        public boolean shouldPurge(DecoratedKey key, long maxDeletionTimestamp)
+        {
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39066b72/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
new file mode 100644
index 0000000..1ce94be
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.tools;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.commons.cli.*;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.compaction.LeveledManifest;
+import org.apache.cassandra.db.compaction.SSTableSplitter;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.service.CassandraDaemon;
+import org.apache.cassandra.utils.Pair;
+
+import static org.apache.cassandra.tools.BulkLoader.CmdLineOptions;
+
+public class StandaloneSplitter
+{
+    public static final int DEFAULT_SSTABLE_SIZE = 50;
+
+    static
+    {
+        CassandraDaemon.initLog4j();
+    }
+
+    private static final String TOOL_NAME = "sstablessplit";
+    private static final String VERBOSE_OPTION = "verbose";
+    private static final String DEBUG_OPTION = "debug";
+    private static final String HELP_OPTION = "help";
+    private static final String NO_SNAPSHOT_OPTION = "no-snapshot";
+    private static final String SIZE_OPTION = "size";
+
+    public static void main(String args[]) throws IOException
+    {
+        Options options = Options.parseArgs(args);
+        try
+        {
+            // load keyspace descriptions.
+            DatabaseDescriptor.loadSchemas();
+
+            String ksName = null;
+            String cfName = null;
+            Map<Descriptor, Set<Component>> parsedFilenames = new HashMap<Descriptor, Set<Component>>();
+            for (String filename : options.filenames)
+            {
+                File file = new File(filename);
+                if (!file.exists()) {
+                    System.out.println("Skipping inexisting file " + file);
+                    continue;
+                }
+
+                Pair<Descriptor, Component> pair = SSTable.tryComponentFromFilename(file.getParentFile(), file.getName());
+                if (pair == null) {
+                    System.out.println("Skipping non sstable file " + file);
+                    continue;
+                }
+                Descriptor desc = pair.left;
+
+                if (ksName == null)
+                    ksName = desc.ksname;
+                else if (!ksName.equals(desc.ksname))
+                    throw new IllegalArgumentException("All sstables must be part of the same keyspace");
+
+                if (cfName == null)
+                    cfName = desc.cfname;
+                else if (!cfName.equals(desc.cfname))
+                    throw new IllegalArgumentException("All sstables must be part of the same column family");
+
+                Set<Component> components = new HashSet<Component>(Arrays.asList(new Component[]{
+                    Component.DATA,
+                    Component.PRIMARY_INDEX,
+                    Component.FILTER,
+                    Component.COMPRESSION_INFO,
+                    Component.STATS
+                }));
+
+                Iterator<Component> iter = components.iterator();
+                while (iter.hasNext()) {
+                    Component component = iter.next();
+                    if (!(new File(desc.filenameFor(component)).exists()))
+                        iter.remove();
+                }
+                parsedFilenames.put(desc, components);
+            }
+
+            if (ksName == null || cfName == null)
+            {
+                System.err.println("No valid sstables to split");
+                System.exit(1);
+            }
+
+            // Do not load sstables since they might be broken
+            Table table = Table.openWithoutSSTables(ksName);
+            ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+
+            String snapshotName = "pre-split-" + System.currentTimeMillis();
+
+            List<SSTableReader> sstables = new ArrayList<SSTableReader>();
+            for (Map.Entry<Descriptor, Set<Component>> fn : parsedFilenames.entrySet())
+            {
+                try
+                {
+                    SSTableReader sstable = SSTableReader.openNoValidation(fn.getKey(), fn.getValue(), cfs.metadata);
+                    sstables.add(sstable);
+
+                    if (options.snapshot) {
+                        File snapshotDirectory = Directories.getSnapshotDirectory(sstable.descriptor, snapshotName);
+                        sstable.createLinks(snapshotDirectory.getPath());
+                    }
+
+                }
+                catch (Exception e)
+                {
+                    System.err.println(String.format("Error Loading %s: %s", fn.getKey(), e.getMessage()));
+                    if (options.debug)
+                        e.printStackTrace(System.err);
+                }
+            }
+            if (options.snapshot)
+                System.out.println(String.format("Pre-split sstables snapshotted into snapshot %s", snapshotName));
+
+            cfs.getDataTracker().markCompacting(sstables);
+            for (SSTableReader sstable : sstables)
+            {
+                try
+                {
+                    new SSTableSplitter(cfs, sstable, options.sizeInMB).split();
+
+                    // Remove the sstable
+                    sstable.markCompacted();
+                    sstable.releaseReference();
+                }
+                catch (Exception e)
+                {
+                    System.err.println(String.format("Error splitting %s: %s", sstable, e.getMessage()));
+                    if (options.debug)
+                        e.printStackTrace(System.err);
+                }
+            }
+            SSTableDeletingTask.waitForDeletions();
+            System.exit(0); // We need that to stop non daemonized threads
+        }
+        catch (Exception e)
+        {
+            System.err.println(e.getMessage());
+            if (options.debug)
+                e.printStackTrace(System.err);
+            System.exit(1);
+        }
+    }
+
+    private static class Options
+    {
+        public final List<String> filenames;
+
+        public boolean debug;
+        public boolean verbose;
+        public boolean snapshot;
+        public int sizeInMB;
+
+        private Options(List<String> filenames)
+        {
+            this.filenames = filenames;
+        }
+
+        public static Options parseArgs(String cmdArgs[])
+        {
+            CommandLineParser parser = new GnuParser();
+            CmdLineOptions options = getCmdLineOptions();
+            try
+            {
+                CommandLine cmd = parser.parse(options, cmdArgs, false);
+
+                if (cmd.hasOption(HELP_OPTION))
+                {
+                    printUsage(options);
+                    System.exit(0);
+                }
+
+                String[] args = cmd.getArgs();
+                if (args.length == 0)
+                {
+                    System.err.println("No sstables to split");
+                    printUsage(options);
+                    System.exit(1);
+                }
+                Options opts = new Options(Arrays.asList(args));
+                opts.debug = cmd.hasOption(DEBUG_OPTION);
+                opts.verbose = cmd.hasOption(VERBOSE_OPTION);
+                opts.snapshot = !cmd.hasOption(NO_SNAPSHOT_OPTION);
+
+                if (cmd.hasOption(SIZE_OPTION))
+                    opts.sizeInMB = Integer.valueOf(cmd.getOptionValue(SIZE_OPTION));
+
+                return opts;
+            }
+            catch (ParseException e)
+            {
+                errorMsg(e.getMessage(), options);
+                return null;
+            }
+        }
+
+        private static void errorMsg(String msg, CmdLineOptions options)
+        {
+            System.err.println(msg);
+            printUsage(options);
+            System.exit(1);
+        }
+
+        private static CmdLineOptions getCmdLineOptions()
+        {
+            CmdLineOptions options = new CmdLineOptions();
+            options.addOption(null, DEBUG_OPTION,          "display stack traces");
+            options.addOption("v",  VERBOSE_OPTION,        "verbose output");
+            options.addOption("h",  HELP_OPTION,           "display this help message");
+            options.addOption(null, NO_SNAPSHOT_OPTION,    "don't snapshot the sstables before splitting");
+            options.addOption("s",  SIZE_OPTION, "size",   "maximum size in MB for the output sstables (default: " + DEFAULT_SSTABLE_SIZE + ")");
+            return options;
+        }
+
+        public static void printUsage(CmdLineOptions options)
+        {
+            String usage = String.format("%s [options] <filename> [<filename>]*", TOOL_NAME);
+            StringBuilder header = new StringBuilder();
+            header.append("--\n");
+            header.append("Split the provided sstables files in sstables of maximum provided file size (see option --" + SIZE_OPTION + ")." );
+            header.append("\n--\n");
+            header.append("Options are:");
+            new HelpFormatter().printHelp(usage, header.toString(), options, "");
+        }
+    }
+}


[2/2] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0.0

Posted by sl...@apache.org.
Merge branch 'cassandra-1.2' into cassandra-2.0.0

Conflicts:
	src/java/org/apache/cassandra/db/compaction/CompactionTask.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/624a7a02
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/624a7a02
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/624a7a02

Branch: refs/heads/cassandra-2.0.0
Commit: 624a7a02f8e1240aabdac3117db64d8f3a5da99b
Parents: 5774ecb 39066b7
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Aug 22 10:17:54 2013 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Aug 22 10:17:54 2013 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   1 +
 bin/sstablesplit                                |  50 ++++
 debian/cassandra.install                        |   1 +
 .../cassandra/db/compaction/CompactionTask.java |  14 +-
 .../db/compaction/SSTableSplitter.java          | 105 ++++++++
 .../cassandra/tools/StandaloneSplitter.java     | 256 +++++++++++++++++++
 7 files changed, 426 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/624a7a02/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/624a7a02/NEWS.txt
----------------------------------------------------------------------
diff --cc NEWS.txt
index 717dd4a,491a438..cd8c880
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -97,7 -21,12 +97,8 @@@ Feature
      - A history of executed nodetool commands is now captured.
        It can be found in ~/.cassandra/nodetool.history. Other tools output files
        (cli and cqlsh history, .cqlshrc) are now centralized in ~/.cassandra, as well.
+     - A new sstablesplit utility allows to split large sstables offline.
  
 -Defaults
 ---------
 -    - After performance testing for CASSANDRA-5727, the default LCS filesize
 -      has been changed from 5MB to 160MB.
  
  
  1.2.7

http://git-wip-us.apache.org/repos/asf/cassandra/blob/624a7a02/debian/cassandra.install
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/624a7a02/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index a7b6c64,0fed0a2..c27a020
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -102,13 -96,9 +102,13 @@@ public class CompactionTask extends Abs
  
          // sanity check: all sstables must belong to the same cfs
          for (SSTableReader sstable : toCompact)
 -            assert sstable.descriptor.cfname.equals(cfs.columnFamily);
 +            assert sstable.descriptor.cfname.equals(cfs.name);
 +
 +        UUID taskId = SystemKeyspace.startCompaction(cfs, toCompact);
  
-         CompactionController controller = new CompactionController(cfs, toCompact, gcBefore);
+         CompactionController controller = getCompactionController(toCompact);
 +        Set<SSTableReader> actuallyCompact = Sets.difference(toCompact, controller.getFullyExpiredSSTables());
 +
          // new sstables from flush can be added during a compaction, but only the compaction can remove them,
          // so in our single-threaded compaction world this is a valid way of determining if we're compacting
          // all the sstables (that existed when we started)
@@@ -237,12 -222,18 +237,12 @@@
              {
                  throw new RuntimeException(e);
              }
 -
 -            if (collector != null)
 -                collector.finishCompaction(ci);
          }
  
-         cfs.replaceCompactedSSTables(toCompact, sstables, compactionType);
+         replaceCompactedSSTables(toCompact, sstables);
          // TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up
          for (SSTableReader sstable : sstables)
 -        {
 -            for (Map.Entry<DecoratedKey, RowIndexEntry> entry : cachedKeyMap.get(sstable.descriptor).entrySet())
 -               sstable.cacheKey(entry.getKey(), entry.getValue());
 -        }
 +            sstable.preheat(cachedKeyMap.get(sstable.descriptor));
  
          if (logger.isInfoEnabled())
          {
@@@ -274,20 -265,16 +274,30 @@@
          }
      }
  
 +    private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable)
 +    {
 +        return new SSTableWriter(cfs.getTempSSTablePath(sstableDirectory),
 +                                 keysPerSSTable,
 +                                 cfs.metadata,
 +                                 cfs.partitioner,
 +                                 SSTableMetadata.createCollector(toCompact, cfs.metadata.comparator, getLevel()));
 +    }
 +
 +    protected int getLevel()
 +    {
 +        return 0;
 +    }
 +
+     protected void replaceCompactedSSTables(Collection<SSTableReader> compacted, Collection<SSTableReader> replacements)
+     {
+         cfs.replaceCompactedSSTables(compacted, replacements, compactionType);
+     }
+ 
 -    protected CompactionController getCompactionController(Collection<SSTableReader> toCompact)
++    protected CompactionController getCompactionController(Set<SSTableReader> toCompact)
+     {
+         return new CompactionController(cfs, toCompact, gcBefore);
+     }
+ 
      protected boolean partialCompactionsAcceptable()
      {
          return !isUserDefined;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/624a7a02/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
index 0000000,214c7a1..a22b7ca
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
+++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
@@@ -1,0 -1,105 +1,105 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.db.compaction;
+ 
+ import java.io.IOException;
+ import java.util.*;
+ 
+ import org.apache.cassandra.db.*;
+ import org.apache.cassandra.io.sstable.*;
+ 
+ public class SSTableSplitter {
+ 
+     private final SplittingCompactionTask task;
+ 
+     private CompactionInfo.Holder info;
+ 
+     public SSTableSplitter(ColumnFamilyStore cfs, SSTableReader sstable, int sstableSizeInMB)
+     {
+         this.task = new SplittingCompactionTask(cfs, sstable, sstableSizeInMB);
+     }
+ 
+     public void split() throws IOException
+     {
+         task.execute(new StatsCollector());
+     }
+ 
+     public class StatsCollector implements CompactionManager.CompactionExecutorStatsCollector
+     {
+         public void beginCompaction(CompactionInfo.Holder ci)
+         {
+             SSTableSplitter.this.info = ci;
+         }
+ 
+         public void finishCompaction(CompactionInfo.Holder ci)
+         {
+             // no-op
+         }
+     }
+ 
+     public static class SplittingCompactionTask extends CompactionTask
+     {
+         private final int sstableSizeInMB;
+ 
+         public SplittingCompactionTask(ColumnFamilyStore cfs, SSTableReader sstable, int sstableSizeInMB)
+         {
+             super(cfs, Collections.singletonList(sstable), CompactionManager.NO_GC);
+             this.sstableSizeInMB = sstableSizeInMB;
+ 
+             if (sstableSizeInMB <= 0)
+                 throw new IllegalArgumentException("Invalid target size for SSTables, must be > 0 (got: " + sstableSizeInMB + ")");
+         }
+ 
+         @Override
 -        protected CompactionController getCompactionController(Collection<SSTableReader> toCompact)
++        protected CompactionController getCompactionController(Set<SSTableReader> toCompact)
+         {
+             return new SplitController(cfs, toCompact);
+         }
+ 
+         @Override
+         protected void replaceCompactedSSTables(Collection<SSTableReader> compacted, Collection<SSTableReader> replacements)
+         {
+         }
+ 
+         @Override
+         protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer)
+         {
+             return writer.getOnDiskFilePointer() > sstableSizeInMB * 1024L * 1024L;
+         }
+ 
+         @Override
+         protected boolean partialCompactionsAcceptable()
+         {
+             return true;
+         }
+     }
+ 
+     public static class SplitController extends CompactionController
+     {
+         public SplitController(ColumnFamilyStore cfs, Collection<SSTableReader> toCompact)
+         {
+             super(cfs, CompactionManager.NO_GC);
+         }
+ 
+         @Override
+         public boolean shouldPurge(DecoratedKey key, long maxDeletionTimestamp)
+         {
+             return false;
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/624a7a02/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/StandaloneSplitter.java
index 0000000,1ce94be..245d824
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
@@@ -1,0 -1,256 +1,256 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.cassandra.tools;
+ 
+ import java.io.File;
+ import java.io.IOException;
+ import java.util.*;
+ 
+ import org.apache.commons.cli.*;
+ 
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.config.Schema;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Directories;
 -import org.apache.cassandra.db.Table;
++import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.compaction.LeveledManifest;
+ import org.apache.cassandra.db.compaction.SSTableSplitter;
+ import org.apache.cassandra.io.sstable.*;
+ import org.apache.cassandra.service.CassandraDaemon;
+ import org.apache.cassandra.utils.Pair;
+ 
+ import static org.apache.cassandra.tools.BulkLoader.CmdLineOptions;
+ 
+ public class StandaloneSplitter
+ {
+     public static final int DEFAULT_SSTABLE_SIZE = 50;
+ 
+     static
+     {
+         CassandraDaemon.initLog4j();
+     }
+ 
+     private static final String TOOL_NAME = "sstablessplit";
+     private static final String VERBOSE_OPTION = "verbose";
+     private static final String DEBUG_OPTION = "debug";
+     private static final String HELP_OPTION = "help";
+     private static final String NO_SNAPSHOT_OPTION = "no-snapshot";
+     private static final String SIZE_OPTION = "size";
+ 
+     public static void main(String args[]) throws IOException
+     {
+         Options options = Options.parseArgs(args);
+         try
+         {
+             // load keyspace descriptions.
+             DatabaseDescriptor.loadSchemas();
+ 
+             String ksName = null;
+             String cfName = null;
+             Map<Descriptor, Set<Component>> parsedFilenames = new HashMap<Descriptor, Set<Component>>();
+             for (String filename : options.filenames)
+             {
+                 File file = new File(filename);
+                 if (!file.exists()) {
+                     System.out.println("Skipping inexisting file " + file);
+                     continue;
+                 }
+ 
+                 Pair<Descriptor, Component> pair = SSTable.tryComponentFromFilename(file.getParentFile(), file.getName());
+                 if (pair == null) {
+                     System.out.println("Skipping non sstable file " + file);
+                     continue;
+                 }
+                 Descriptor desc = pair.left;
+ 
+                 if (ksName == null)
+                     ksName = desc.ksname;
+                 else if (!ksName.equals(desc.ksname))
+                     throw new IllegalArgumentException("All sstables must be part of the same keyspace");
+ 
+                 if (cfName == null)
+                     cfName = desc.cfname;
+                 else if (!cfName.equals(desc.cfname))
+                     throw new IllegalArgumentException("All sstables must be part of the same column family");
+ 
+                 Set<Component> components = new HashSet<Component>(Arrays.asList(new Component[]{
+                     Component.DATA,
+                     Component.PRIMARY_INDEX,
+                     Component.FILTER,
+                     Component.COMPRESSION_INFO,
+                     Component.STATS
+                 }));
+ 
+                 Iterator<Component> iter = components.iterator();
+                 while (iter.hasNext()) {
+                     Component component = iter.next();
+                     if (!(new File(desc.filenameFor(component)).exists()))
+                         iter.remove();
+                 }
+                 parsedFilenames.put(desc, components);
+             }
+ 
+             if (ksName == null || cfName == null)
+             {
+                 System.err.println("No valid sstables to split");
+                 System.exit(1);
+             }
+ 
+             // Do not load sstables since they might be broken
 -            Table table = Table.openWithoutSSTables(ksName);
 -            ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
++            Keyspace keyspace = Keyspace.openWithoutSSTables(ksName);
++            ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
+ 
+             String snapshotName = "pre-split-" + System.currentTimeMillis();
+ 
+             List<SSTableReader> sstables = new ArrayList<SSTableReader>();
+             for (Map.Entry<Descriptor, Set<Component>> fn : parsedFilenames.entrySet())
+             {
+                 try
+                 {
+                     SSTableReader sstable = SSTableReader.openNoValidation(fn.getKey(), fn.getValue(), cfs.metadata);
+                     sstables.add(sstable);
+ 
+                     if (options.snapshot) {
+                         File snapshotDirectory = Directories.getSnapshotDirectory(sstable.descriptor, snapshotName);
+                         sstable.createLinks(snapshotDirectory.getPath());
+                     }
+ 
+                 }
+                 catch (Exception e)
+                 {
+                     System.err.println(String.format("Error Loading %s: %s", fn.getKey(), e.getMessage()));
+                     if (options.debug)
+                         e.printStackTrace(System.err);
+                 }
+             }
+             if (options.snapshot)
+                 System.out.println(String.format("Pre-split sstables snapshotted into snapshot %s", snapshotName));
+ 
+             cfs.getDataTracker().markCompacting(sstables);
+             for (SSTableReader sstable : sstables)
+             {
+                 try
+                 {
+                     new SSTableSplitter(cfs, sstable, options.sizeInMB).split();
+ 
+                     // Remove the sstable
 -                    sstable.markCompacted();
++                    sstable.markObsolete();
+                     sstable.releaseReference();
+                 }
+                 catch (Exception e)
+                 {
+                     System.err.println(String.format("Error splitting %s: %s", sstable, e.getMessage()));
+                     if (options.debug)
+                         e.printStackTrace(System.err);
+                 }
+             }
+             SSTableDeletingTask.waitForDeletions();
+             System.exit(0); // We need that to stop non daemonized threads
+         }
+         catch (Exception e)
+         {
+             System.err.println(e.getMessage());
+             if (options.debug)
+                 e.printStackTrace(System.err);
+             System.exit(1);
+         }
+     }
+ 
+     private static class Options
+     {
+         public final List<String> filenames;
+ 
+         public boolean debug;
+         public boolean verbose;
+         public boolean snapshot;
+         public int sizeInMB;
+ 
+         private Options(List<String> filenames)
+         {
+             this.filenames = filenames;
+         }
+ 
+         public static Options parseArgs(String cmdArgs[])
+         {
+             CommandLineParser parser = new GnuParser();
+             CmdLineOptions options = getCmdLineOptions();
+             try
+             {
+                 CommandLine cmd = parser.parse(options, cmdArgs, false);
+ 
+                 if (cmd.hasOption(HELP_OPTION))
+                 {
+                     printUsage(options);
+                     System.exit(0);
+                 }
+ 
+                 String[] args = cmd.getArgs();
+                 if (args.length == 0)
+                 {
+                     System.err.println("No sstables to split");
+                     printUsage(options);
+                     System.exit(1);
+                 }
+                 Options opts = new Options(Arrays.asList(args));
+                 opts.debug = cmd.hasOption(DEBUG_OPTION);
+                 opts.verbose = cmd.hasOption(VERBOSE_OPTION);
+                 opts.snapshot = !cmd.hasOption(NO_SNAPSHOT_OPTION);
+ 
+                 if (cmd.hasOption(SIZE_OPTION))
+                     opts.sizeInMB = Integer.valueOf(cmd.getOptionValue(SIZE_OPTION));
+ 
+                 return opts;
+             }
+             catch (ParseException e)
+             {
+                 errorMsg(e.getMessage(), options);
+                 return null;
+             }
+         }
+ 
+         private static void errorMsg(String msg, CmdLineOptions options)
+         {
+             System.err.println(msg);
+             printUsage(options);
+             System.exit(1);
+         }
+ 
+         private static CmdLineOptions getCmdLineOptions()
+         {
+             CmdLineOptions options = new CmdLineOptions();
+             options.addOption(null, DEBUG_OPTION,          "display stack traces");
+             options.addOption("v",  VERBOSE_OPTION,        "verbose output");
+             options.addOption("h",  HELP_OPTION,           "display this help message");
+             options.addOption(null, NO_SNAPSHOT_OPTION,    "don't snapshot the sstables before splitting");
+             options.addOption("s",  SIZE_OPTION, "size",   "maximum size in MB for the output sstables (default: " + DEFAULT_SSTABLE_SIZE + ")");
+             return options;
+         }
+ 
+         public static void printUsage(CmdLineOptions options)
+         {
+             String usage = String.format("%s [options] <filename> [<filename>]*", TOOL_NAME);
+             StringBuilder header = new StringBuilder();
+             header.append("--\n");
+             header.append("Split the provided sstables files in sstables of maximum provided file size (see option --" + SIZE_OPTION + ")." );
+             header.append("\n--\n");
+             header.append("Options are:");
+             new HelpFormatter().printHelp(usage, header.toString(), options, "");
+         }
+     }
+ }