You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/08/22 10:08:20 UTC
git commit: Add SSTableSplitter tool to split sstables offline
Updated Branches:
refs/heads/cassandra-1.2 9bb4d93e3 -> 39066b722
Add SSTableSplitter tool to split sstables offline
patch by slebresne; reviewed by krummas for CASSANDRA-4766
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/39066b72
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/39066b72
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/39066b72
Branch: refs/heads/cassandra-1.2
Commit: 39066b722607fa88e75d5bc772bd52f1ec8914a0
Parents: 9bb4d93
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Sun Nov 11 14:54:43 2012 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Aug 22 10:08:10 2013 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 3 +-
bin/sstablesplit | 50 ++++
debian/cassandra.install | 1 +
.../cassandra/db/compaction/CompactionTask.java | 14 +-
.../db/compaction/SSTableSplitter.java | 105 ++++++++
.../cassandra/tools/StandaloneSplitter.java | 256 +++++++++++++++++++
7 files changed, 427 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39066b72/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e1c963c..e887c27 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -26,6 +26,7 @@
* Fix to support off heap bloom filters size greater than 2 GB (CASSANDRA-5903)
* Properly handle parsing huge map and set literals (CASSANDRA-5893)
* Fix LCS L0 compaction may overlap in L1 (CASSANDRA-5907)
+ * New sstablesplit tool to split large sstables offline (CASSANDRA-4766)
Merged from 1.1:
* Correctly validate sparse composite cells in scrub (CASSANDRA-5855)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39066b72/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 11281ee..491a438 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -18,9 +18,10 @@ using the provided 'sstableupgrade' tool.
Features
--------
- - A history of executed nodetool commands is now captured.
+ - A history of executed nodetool commands is now captured.
It can be found in ~/.cassandra/nodetool.history. Other tools output files
(cli and cqlsh history, .cqlshrc) are now centralized in ~/.cassandra, as well.
+ - A new sstablesplit utility allows to split large sstables offline.
Defaults
--------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39066b72/bin/sstablesplit
----------------------------------------------------------------------
diff --git a/bin/sstablesplit b/bin/sstablesplit
new file mode 100755
index 0000000..933a67d
--- /dev/null
+++ b/bin/sstablesplit
@@ -0,0 +1,50 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ "x$CASSANDRA_INCLUDE" = "x" ]; then
+ for include in /usr/share/cassandra/cassandra.in.sh \
+ /usr/local/share/cassandra/cassandra.in.sh \
+ /opt/cassandra/cassandra.in.sh \
+ ~/.cassandra.in.sh \
+ `dirname $0`/cassandra.in.sh; do
+ if [ -r $include ]; then
+ . $include
+ break
+ fi
+ done
+elif [ -r $CASSANDRA_INCLUDE ]; then
+ . $CASSANDRA_INCLUDE
+fi
+
+# Use JAVA_HOME if set, otherwise look for java in PATH
+if [ -x $JAVA_HOME/bin/java ]; then
+ JAVA=$JAVA_HOME/bin/java
+else
+ JAVA=`which java`
+fi
+
+if [ -z $CLASSPATH ]; then
+ echo "You must set the CLASSPATH var" >&2
+ exit 1
+fi
+
+$JAVA -ea -cp $CLASSPATH -Xmx256M \
+ -Dlog4j.configuration=log4j-tools.properties \
+ org.apache.cassandra.tools.StandaloneSplitter "$@"
+
+# vi:ai sw=4 ts=4 tw=0 et
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39066b72/debian/cassandra.install
----------------------------------------------------------------------
diff --git a/debian/cassandra.install b/debian/cassandra.install
index a504b78..70d4b97 100644
--- a/debian/cassandra.install
+++ b/debian/cassandra.install
@@ -18,6 +18,7 @@ bin/sstableloader usr/bin
bin/cqlsh usr/bin
bin/sstablescrub usr/bin
bin/sstableupgrade usr/bin
+bin/sstablesplit usr/bin
bin/cassandra-shuffle usr/bin
tools/bin/cassandra-stress usr/bin
tools/bin/token-generator usr/bin
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39066b72/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index a6b9f89..0fed0a2 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -98,7 +98,7 @@ public class CompactionTask extends AbstractCompactionTask
for (SSTableReader sstable : toCompact)
assert sstable.descriptor.cfname.equals(cfs.columnFamily);
- CompactionController controller = new CompactionController(cfs, toCompact, gcBefore);
+ CompactionController controller = getCompactionController(toCompact);
// new sstables from flush can be added during a compaction, but only the compaction can remove them,
// so in our single-threaded compaction world this is a valid way of determining if we're compacting
// all the sstables (that existed when we started)
@@ -227,7 +227,7 @@ public class CompactionTask extends AbstractCompactionTask
collector.finishCompaction(ci);
}
- cfs.replaceCompactedSSTables(toCompact, sstables, compactionType);
+ replaceCompactedSSTables(toCompact, sstables);
// TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up
for (SSTableReader sstable : sstables)
{
@@ -265,6 +265,16 @@ public class CompactionTask extends AbstractCompactionTask
}
}
+ protected void replaceCompactedSSTables(Collection<SSTableReader> compacted, Collection<SSTableReader> replacements)
+ {
+ cfs.replaceCompactedSSTables(compacted, replacements, compactionType);
+ }
+
+ protected CompactionController getCompactionController(Collection<SSTableReader> toCompact)
+ {
+ return new CompactionController(cfs, toCompact, gcBefore);
+ }
+
protected boolean partialCompactionsAcceptable()
{
return !isUserDefined;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39066b72/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
new file mode 100644
index 0000000..214c7a1
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.io.sstable.*;
+
+public class SSTableSplitter {
+
+ private final SplittingCompactionTask task;
+
+ private CompactionInfo.Holder info;
+
+ public SSTableSplitter(ColumnFamilyStore cfs, SSTableReader sstable, int sstableSizeInMB)
+ {
+ this.task = new SplittingCompactionTask(cfs, sstable, sstableSizeInMB);
+ }
+
+ public void split() throws IOException
+ {
+ task.execute(new StatsCollector());
+ }
+
+ public class StatsCollector implements CompactionManager.CompactionExecutorStatsCollector
+ {
+ public void beginCompaction(CompactionInfo.Holder ci)
+ {
+ SSTableSplitter.this.info = ci;
+ }
+
+ public void finishCompaction(CompactionInfo.Holder ci)
+ {
+ // no-op
+ }
+ }
+
+ public static class SplittingCompactionTask extends CompactionTask
+ {
+ private final int sstableSizeInMB;
+
+ public SplittingCompactionTask(ColumnFamilyStore cfs, SSTableReader sstable, int sstableSizeInMB)
+ {
+ super(cfs, Collections.singletonList(sstable), CompactionManager.NO_GC);
+ this.sstableSizeInMB = sstableSizeInMB;
+
+ if (sstableSizeInMB <= 0)
+ throw new IllegalArgumentException("Invalid target size for SSTables, must be > 0 (got: " + sstableSizeInMB + ")");
+ }
+
+ @Override
+ protected CompactionController getCompactionController(Collection<SSTableReader> toCompact)
+ {
+ return new SplitController(cfs, toCompact);
+ }
+
+ @Override
+ protected void replaceCompactedSSTables(Collection<SSTableReader> compacted, Collection<SSTableReader> replacements)
+ {
+ }
+
+ @Override
+ protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer)
+ {
+ return writer.getOnDiskFilePointer() > sstableSizeInMB * 1024L * 1024L;
+ }
+
+ @Override
+ protected boolean partialCompactionsAcceptable()
+ {
+ return true;
+ }
+ }
+
+ public static class SplitController extends CompactionController
+ {
+ public SplitController(ColumnFamilyStore cfs, Collection<SSTableReader> toCompact)
+ {
+ super(cfs, CompactionManager.NO_GC);
+ }
+
+ @Override
+ public boolean shouldPurge(DecoratedKey key, long maxDeletionTimestamp)
+ {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/39066b72/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
new file mode 100644
index 0000000..1ce94be
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.tools;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.commons.cli.*;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.compaction.LeveledManifest;
+import org.apache.cassandra.db.compaction.SSTableSplitter;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.service.CassandraDaemon;
+import org.apache.cassandra.utils.Pair;
+
+import static org.apache.cassandra.tools.BulkLoader.CmdLineOptions;
+
+public class StandaloneSplitter
+{
+ public static final int DEFAULT_SSTABLE_SIZE = 50;
+
+ static
+ {
+ CassandraDaemon.initLog4j();
+ }
+
+ private static final String TOOL_NAME = "sstablessplit";
+ private static final String VERBOSE_OPTION = "verbose";
+ private static final String DEBUG_OPTION = "debug";
+ private static final String HELP_OPTION = "help";
+ private static final String NO_SNAPSHOT_OPTION = "no-snapshot";
+ private static final String SIZE_OPTION = "size";
+
+ public static void main(String args[]) throws IOException
+ {
+ Options options = Options.parseArgs(args);
+ try
+ {
+ // load keyspace descriptions.
+ DatabaseDescriptor.loadSchemas();
+
+ String ksName = null;
+ String cfName = null;
+ Map<Descriptor, Set<Component>> parsedFilenames = new HashMap<Descriptor, Set<Component>>();
+ for (String filename : options.filenames)
+ {
+ File file = new File(filename);
+ if (!file.exists()) {
+ System.out.println("Skipping inexisting file " + file);
+ continue;
+ }
+
+ Pair<Descriptor, Component> pair = SSTable.tryComponentFromFilename(file.getParentFile(), file.getName());
+ if (pair == null) {
+ System.out.println("Skipping non sstable file " + file);
+ continue;
+ }
+ Descriptor desc = pair.left;
+
+ if (ksName == null)
+ ksName = desc.ksname;
+ else if (!ksName.equals(desc.ksname))
+ throw new IllegalArgumentException("All sstables must be part of the same keyspace");
+
+ if (cfName == null)
+ cfName = desc.cfname;
+ else if (!cfName.equals(desc.cfname))
+ throw new IllegalArgumentException("All sstables must be part of the same column family");
+
+ Set<Component> components = new HashSet<Component>(Arrays.asList(new Component[]{
+ Component.DATA,
+ Component.PRIMARY_INDEX,
+ Component.FILTER,
+ Component.COMPRESSION_INFO,
+ Component.STATS
+ }));
+
+ Iterator<Component> iter = components.iterator();
+ while (iter.hasNext()) {
+ Component component = iter.next();
+ if (!(new File(desc.filenameFor(component)).exists()))
+ iter.remove();
+ }
+ parsedFilenames.put(desc, components);
+ }
+
+ if (ksName == null || cfName == null)
+ {
+ System.err.println("No valid sstables to split");
+ System.exit(1);
+ }
+
+ // Do not load sstables since they might be broken
+ Table table = Table.openWithoutSSTables(ksName);
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+
+ String snapshotName = "pre-split-" + System.currentTimeMillis();
+
+ List<SSTableReader> sstables = new ArrayList<SSTableReader>();
+ for (Map.Entry<Descriptor, Set<Component>> fn : parsedFilenames.entrySet())
+ {
+ try
+ {
+ SSTableReader sstable = SSTableReader.openNoValidation(fn.getKey(), fn.getValue(), cfs.metadata);
+ sstables.add(sstable);
+
+ if (options.snapshot) {
+ File snapshotDirectory = Directories.getSnapshotDirectory(sstable.descriptor, snapshotName);
+ sstable.createLinks(snapshotDirectory.getPath());
+ }
+
+ }
+ catch (Exception e)
+ {
+ System.err.println(String.format("Error Loading %s: %s", fn.getKey(), e.getMessage()));
+ if (options.debug)
+ e.printStackTrace(System.err);
+ }
+ }
+ if (options.snapshot)
+ System.out.println(String.format("Pre-split sstables snapshotted into snapshot %s", snapshotName));
+
+ cfs.getDataTracker().markCompacting(sstables);
+ for (SSTableReader sstable : sstables)
+ {
+ try
+ {
+ new SSTableSplitter(cfs, sstable, options.sizeInMB).split();
+
+ // Remove the sstable
+ sstable.markCompacted();
+ sstable.releaseReference();
+ }
+ catch (Exception e)
+ {
+ System.err.println(String.format("Error splitting %s: %s", sstable, e.getMessage()));
+ if (options.debug)
+ e.printStackTrace(System.err);
+ }
+ }
+ SSTableDeletingTask.waitForDeletions();
+ System.exit(0); // We need that to stop non daemonized threads
+ }
+ catch (Exception e)
+ {
+ System.err.println(e.getMessage());
+ if (options.debug)
+ e.printStackTrace(System.err);
+ System.exit(1);
+ }
+ }
+
+ private static class Options
+ {
+ public final List<String> filenames;
+
+ public boolean debug;
+ public boolean verbose;
+ public boolean snapshot;
+ public int sizeInMB;
+
+ private Options(List<String> filenames)
+ {
+ this.filenames = filenames;
+ }
+
+ public static Options parseArgs(String cmdArgs[])
+ {
+ CommandLineParser parser = new GnuParser();
+ CmdLineOptions options = getCmdLineOptions();
+ try
+ {
+ CommandLine cmd = parser.parse(options, cmdArgs, false);
+
+ if (cmd.hasOption(HELP_OPTION))
+ {
+ printUsage(options);
+ System.exit(0);
+ }
+
+ String[] args = cmd.getArgs();
+ if (args.length == 0)
+ {
+ System.err.println("No sstables to split");
+ printUsage(options);
+ System.exit(1);
+ }
+ Options opts = new Options(Arrays.asList(args));
+ opts.debug = cmd.hasOption(DEBUG_OPTION);
+ opts.verbose = cmd.hasOption(VERBOSE_OPTION);
+ opts.snapshot = !cmd.hasOption(NO_SNAPSHOT_OPTION);
+
+ if (cmd.hasOption(SIZE_OPTION))
+ opts.sizeInMB = Integer.valueOf(cmd.getOptionValue(SIZE_OPTION));
+
+ return opts;
+ }
+ catch (ParseException e)
+ {
+ errorMsg(e.getMessage(), options);
+ return null;
+ }
+ }
+
+ private static void errorMsg(String msg, CmdLineOptions options)
+ {
+ System.err.println(msg);
+ printUsage(options);
+ System.exit(1);
+ }
+
+ private static CmdLineOptions getCmdLineOptions()
+ {
+ CmdLineOptions options = new CmdLineOptions();
+ options.addOption(null, DEBUG_OPTION, "display stack traces");
+ options.addOption("v", VERBOSE_OPTION, "verbose output");
+ options.addOption("h", HELP_OPTION, "display this help message");
+ options.addOption(null, NO_SNAPSHOT_OPTION, "don't snapshot the sstables before splitting");
+ options.addOption("s", SIZE_OPTION, "size", "maximum size in MB for the output sstables (default: " + DEFAULT_SSTABLE_SIZE + ")");
+ return options;
+ }
+
+ public static void printUsage(CmdLineOptions options)
+ {
+ String usage = String.format("%s [options] <filename> [<filename>]*", TOOL_NAME);
+ StringBuilder header = new StringBuilder();
+ header.append("--\n");
+ header.append("Split the provided sstables files in sstables of maximum provided file size (see option --" + SIZE_OPTION + ")." );
+ header.append("\n--\n");
+ header.append("Options are:");
+ new HelpFormatter().printHelp(usage, header.toString(), options, "");
+ }
+ }
+}