You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/10/29 11:11:58 UTC

[2/2] git commit: Add CQL-aware SSTableWriter

Add CQL-aware SSTableWriter

patch by slebresne; reviewed by jbellis for CASSANDRA-5894


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/97cbf6ad
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/97cbf6ad
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/97cbf6ad

Branch: refs/heads/cassandra-2.0
Commit: 97cbf6ad33af1fe3d16912bd3280b53bf9f22bb2
Parents: a999b15
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Oct 29 11:11:05 2013 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Oct 29 11:11:05 2013 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |  10 +
 .../org/apache/cassandra/config/CFMetaData.java |   3 +-
 .../apache/cassandra/cql3/QueryProcessor.java   |   2 +-
 .../cql3/statements/ModificationStatement.java  |   4 +-
 .../cql3/statements/UpdateStatement.java        |   9 +-
 .../io/sstable/AbstractSSTableSimpleWriter.java |  19 +
 .../cassandra/io/sstable/CQLSSTableWriter.java  | 476 +++++++++++++++++++
 .../io/sstable/SSTableSimpleUnsortedWriter.java |   2 +-
 .../io/sstable/SSTableSimpleWriter.java         |   2 +-
 .../cassandra/io/sstable/SSTableWriter.java     |  37 +-
 .../unit/org/apache/cassandra/SchemaLoader.java |  14 +-
 .../io/sstable/CQLSSTableWriterTest.java        | 115 +++++
 13 files changed, 671 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0d60fae..7bf7f21 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@
  * Update memtable size while flushing (CASSANDRA-6249)
  * Provide hooks around CQL2/CQL3 statement execution (CASSANDRA-6252)
  * Require Permission.SELECT for CAS updates (CASSANDRA-6247)
+ * New CQL-aware SSTableWriter (CASSANDRA-5894)
 Merged from 1.2:
  * Require logging in for Thrift CQL2/3 statement preparation (CASSANDRA-6254)
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 69ab4fd..2489f26 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -19,6 +19,16 @@ using the provided 'sstableupgrade' tool.
 
 New features
 ------------
+    - A new CQLSSTableWriter class has been added. It is the equivalent of
+      the existing SSTableSimpleWriter/SSTableSimpleUnsortedWriter but is
+      CQL oriented.
+
+
+2.0.2
+=====
+
+New features
+------------
     - Speculative retry defaults to 99th percentile
       (See blog post at http://www.datastax.com/dev/blog/rapid-read-protection-in-cassandra-2-0-2)
     - Configurable metrics reporting

http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 20c35b3..f53c60c 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -476,7 +476,8 @@ public final class CFMetaData
         return compile(cql, Keyspace.SYSTEM_KS);
     }
 
-    private static CFMetaData compile(String cql, String keyspace)
+    @VisibleForTesting
+    public static CFMetaData compile(String cql, String keyspace)
     {
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index ec8b379..dc2649c 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -365,7 +365,7 @@ public class QueryProcessor
             hook.processBatch(batch, context);
     }
 
-    private static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState)
+    public static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState)
     throws RequestValidationException
     {
         Tracing.trace("Parsing {}", queryStr);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 70bafb4..0f425b8 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -205,7 +205,7 @@ public abstract class ModificationStatement implements CQLStatement
         }
     }
 
-    private List<ByteBuffer> buildPartitionKeyNames(List<ByteBuffer> variables)
+    public List<ByteBuffer> buildPartitionKeyNames(List<ByteBuffer> variables)
     throws InvalidRequestException
     {
         CFDefinition cfDef = cfm.getCfDef();
@@ -241,7 +241,7 @@ public abstract class ModificationStatement implements CQLStatement
         return keys;
     }
 
-    private ColumnNameBuilder createClusteringPrefixBuilder(List<ByteBuffer> variables)
+    public ColumnNameBuilder createClusteringPrefixBuilder(List<ByteBuffer> variables)
     throws InvalidRequestException
     {
         CFDefinition cfDef = cfm.getCfDef();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 89f17a7..12348df 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -45,11 +45,10 @@ public class UpdateStatement extends ModificationStatement
         return true;
     }
 
-    public ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
+    public void addUpdateForKey(ColumnFamily cf, ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
     throws InvalidRequestException
     {
         CFDefinition cfDef = cfm.getCfDef();
-        ColumnFamily cf = UnsortedColumns.factory.create(cfm);
 
         // Inserting the CQL row marker (see #4361)
         // We always need to insert a marker, because of the following situation:
@@ -97,7 +96,13 @@ public class UpdateStatement extends ModificationStatement
             for (Operation update : updates)
                 update.execute(key, cf, builder.copy(), params);
         }
+    }
 
+    public ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
+    throws InvalidRequestException
+    {
+        ColumnFamily cf = UnsortedColumns.factory.create(cfm);
+        addUpdateForKey(cf, key, builder, params);
         return cf;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 23f5c85..0059fda 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -164,6 +164,25 @@ public abstract class AbstractSSTableSimpleWriter
      */
     public abstract void close() throws IOException;
 
+    /**
+     * Package protected for use by AbstractCQLSSTableWriter.
+     * Not meant to be exposed publicly.
+     */
+    ColumnFamily currentColumnFamily()
+    {
+        return columnFamily;
+    }
+
+    /**
+     * Package protected for use by AbstractCQLSSTableWriter.
+     * Not meant to be exposed publicly.
+     */
+    DecoratedKey currentKey()
+    {
+        return currentKey;
+    }
+
+
     protected abstract void writeRow(DecoratedKey key, ColumnFamily columnFamily) throws IOException;
 
     protected abstract ColumnFamily getColumnFamily();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
new file mode 100644
index 0000000..86348aa
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.cql3.statements.*;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.config.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.io.compress.CompressionParameters;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * Utility to write SSTables.
+ * <p>
+ * Typical usage looks like:
+ * <pre>
+ *   String schema = "CREATE TABLE myKs.myTable ("
+ *                 + "  k int PRIMARY KEY,"
+ *                 + "  v1 text,"
+ *                 + "  v2 int"
+ *                 + ")";
+ *   String insert = "INSERT INTO myKs.myTable (k, v1, v2) VALUES (?, ?, ?)";
+ *
+ *   // Creates a new writer. You need to provide at least the directory where to write the created sstable,
+ *   // the schema for the sstable to write and a (prepared) insert statement to use. If you do not use the
+ *   // default partitioner (Murmur3Partitioner), you will also need to provide the partitioner in use, see
+ *   // CQLSSTableWriter.Builder for more details on the available options.
+ *   CQLSSTableWriter writer = CQLSSTableWriter.builder()
+ *                                             .inDirectory("path/to/directory")
+ *                                             .forTable(schema)
+ *                                             .using(insert).build();
+ *
+ *   // Adds a nember of rows to the resulting sstable
+ *   writer.addRow(0, "test1", 24);
+ *   writer.addRow(1, "test2", null);
+ *   writer.addRow(2, "test3", 42);
+ *
+ *   // Close the writer, finalizing the sstable
+ *   writer.close();
+ * </pre>
+ */
+public class CQLSSTableWriter
+{
+    private final AbstractSSTableSimpleWriter writer;
+    private final UpdateStatement insert;
+    private final List<ColumnSpecification> boundNames;
+
+    private CQLSSTableWriter(AbstractSSTableSimpleWriter writer, UpdateStatement insert, List<ColumnSpecification> boundNames)
+    {
+        this.writer = writer;
+        this.insert = insert;
+        this.boundNames = boundNames;
+    }
+
+    /**
+     * Returns a new builder for a CQLSSTableWriter.
+     *
+     * @return the new builder.
+     */
+    public static Builder builder()
+    {
+        return new Builder();
+    }
+
+    /**
+     * Adds a new row to the writer.
+     * <p>
+     * This is a shortcut for {@code addRow(Arrays.asList(values))}.
+     *
+     * @param values the row values (corresponding to the bind variables of the
+     * insertion statement used when creating by this writer).
+     * @return this writer.
+     */
+    public CQLSSTableWriter addRow(Object... values)
+    throws InvalidRequestException, IOException
+    {
+        return addRow(Arrays.asList(values));
+    }
+
+    /**
+     * Adds a new row to the writer.
+     * <p>
+     * Each provided value type should correspond to the types of the CQL column
+     * the value is for. The correspondance between java type and CQL type is the
+     * same one than the one documented at
+     * www.datastax.com/drivers/java/2.0/apidocs/com/datastax/driver/core/DataType.Name.html#asJavaClass().
+     * <p>
+     * If you prefer providing the values directly as binary, use
+     * {@link #rawAddRow} instead.
+     *
+     * @param values the row values (corresponding to the bind variables of the
+     * insertion statement used when creating by this writer).
+     * @return this writer.
+     */
+    public CQLSSTableWriter addRow(List<Object> values)
+    throws InvalidRequestException, IOException
+    {
+        int size = Math.min(values.size(), boundNames.size());
+        List<ByteBuffer> rawValues = new ArrayList<>(size);
+        for (int i = 0; i < size; i++)
+            rawValues.add(values.get(i) == null ? null : ((AbstractType)boundNames.get(i).type).decompose(values.get(i)));
+        return rawAddRow(rawValues);
+    }
+
+    /**
+     * Adds a new row to the writer.
+     * <p>
+     * This is equivalent to the other addRow methods, but takes a map whose
+     * keys are the names of the columns to add instead of taking a list of the
+     * values in the order of the insert statement used during construction of
+     * this write.
+     *
+     * @param values a map of colum name to column values representing the new
+     * row to add. Note that if a column is not part of the map, it's value will
+     * be {@code null}. If the map contains keys that does not correspond to one
+     * of the column of the insert statement used when creating this writer, the
+     * the corresponding value is ignored.
+     * @return this writer.
+     */
+    public CQLSSTableWriter addRow(Map<String, Object> values)
+    throws InvalidRequestException, IOException
+    {
+        int size = Math.min(values.size(), boundNames.size());
+        List<ByteBuffer> rawValues = new ArrayList<>(size);
+        for (int i = 0; i < size; i++) {
+            ColumnSpecification spec = boundNames.get(i);
+            rawValues.add(((AbstractType)spec.type).decompose(values.get(spec.name.toString())));
+        }
+        return rawAddRow(rawValues);
+    }
+
+    /**
+     * Adds a new row to the writer given already serialized values.
+     *
+     * @param values the row values (corresponding to the bind variables of the
+     * insertion statement used when creating by this writer) as binary.
+     * @return this writer.
+     */
+    public CQLSSTableWriter rawAddRow(ByteBuffer... values)
+    throws InvalidRequestException, IOException
+    {
+        return rawAddRow(Arrays.asList(values));
+    }
+
+    /**
+     * Adds a new row to the writer given already serialized values.
+     * <p>
+     * This is a shortcut for {@code rawAddRow(Arrays.asList(values))}.
+     *
+     * @param values the row values (corresponding to the bind variables of the
+     * insertion statement used when creating by this writer) as binary.
+     * @return this writer.
+     */
+    public CQLSSTableWriter rawAddRow(List<ByteBuffer> values)
+    throws InvalidRequestException, IOException
+    {
+        if (values.size() != boundNames.size())
+            throw new InvalidRequestException(String.format("Invalid number of arguments, expecting %d values but got %d", boundNames.size(), values.size()));
+
+        List<ByteBuffer> keys = insert.buildPartitionKeyNames(values);
+        ColumnNameBuilder clusteringPrefix = insert.createClusteringPrefixBuilder(values);
+
+        long now = System.currentTimeMillis() * 1000;
+        UpdateParameters params = new UpdateParameters(insert.cfm,
+                                                       values,
+                                                       insert.getTimestamp(now, values),
+                                                       insert.getTimeToLive(values),
+                                                       Collections.<ByteBuffer, ColumnGroupMap>emptyMap());
+
+        for (ByteBuffer key: keys)
+        {
+            if (writer.currentKey() == null || !key.equals(writer.currentKey().key))
+                writer.newRow(key);
+            insert.addUpdateForKey(writer.currentColumnFamily(), key, clusteringPrefix, params);
+        }
+        return this;
+    }
+
+    /**
+     * Adds a new row to the writer given already serialized values.
+     * <p>
+     * This is equivalent to the other rawAddRow methods, but takes a map whose
+     * keys are the names of the columns to add instead of taking a list of the
+     * values in the order of the insert statement used during construction of
+     * this write.
+     *
+     * @param values a map of colum name to column values representing the new
+     * row to add. Note that if a column is not part of the map, it's value will
+     * be {@code null}. If the map contains keys that does not correspond to one
+     * of the column of the insert statement used when creating this writer, the
+     * the corresponding value is ignored.
+     * @return this writer.
+     */
+    public CQLSSTableWriter rawAddRow(Map<String, ByteBuffer> values)
+    throws InvalidRequestException, IOException
+    {
+        int size = Math.min(values.size(), boundNames.size());
+        List<ByteBuffer> rawValues = new ArrayList<>(size);
+        for (int i = 0; i < size; i++) {
+            ColumnSpecification spec = boundNames.get(i);
+            rawValues.add(values.get(spec.name.toString()));
+        }
+        return rawAddRow(rawValues);
+    }
+
+    /**
+     * Close this writer.
+     * <p>
+     * This method should be called, otherwise the produced sstables are not
+     * guaranteed to be complete (and won't be in practice).
+     */
+    public void close() throws IOException
+    {
+        writer.close();
+    }
+
+    /**
+     * A Builder for a CQLSSTableWriter object.
+     */
+    public static class Builder
+    {
+        private File directory;
+        private IPartitioner partitioner = new Murmur3Partitioner();
+
+        private CFMetaData schema;
+        private UpdateStatement insert;
+        private List<ColumnSpecification> boundNames;
+
+        private boolean sorted = false;
+        private long bufferSizeInMB = 128;
+
+        private Builder() {}
+
+        /**
+         * The directory where to write the sstables.
+         * <p>
+         * This is a mandatory option.
+         *
+         * @param directory the directory to use, which should exists and be writable.
+         * @return this builder.
+         *
+         * @throws IllegalArgumentException if {@code directory} doesn't exist or is not writable.
+         */
+        public Builder inDirectory(String directory)
+        {
+            return inDirectory(new File(directory));
+        }
+
+        /**
+         * The directory where to write the sstables (mandatory option).
+         * <p>
+         * This is a mandatory option.
+         *
+         * @param directory the directory to use, which should exists and be writable.
+         * @return this builder.
+         *
+         * @throws IllegalArgumentException if {@code directory} doesn't exist or is not writable.
+         */
+        public Builder inDirectory(File directory)
+        {
+            if (!directory.exists())
+                throw new IllegalArgumentException(directory + " doesn't exists");
+            if (!directory.canWrite())
+                throw new IllegalArgumentException(directory + " exists but is not writable");
+
+            this.directory = directory;
+            return this;
+        }
+
+        /**
+         * The schema (CREATE TABLE statement) for the table for which sstable are to be created.
+         * <p>
+         * Please note that the provided CREATE TABLE statement <b>must</b> use a fully-qualified
+         * table name, one that include the keyspace name.
+         * <p>
+         * This is a mandatory option.
+         *
+         * @param schema the schema of the table for which sstables are to be created.
+         * @return this builder.
+         *
+         * @throws IllegalArgumentException if {@code schema} is not a valid CREATE TABLE statement
+         * or does not have a fully-qualified table name.
+         */
+        public Builder forTable(String schema)
+        {
+            try
+            {
+                this.schema = getStatement(schema, CreateTableStatement.class, "CREATE TABLE").left.getCFMetaData().rebuild();
+
+                // We need to register the keyspace/table metadata through Schema, otherwise we won't be able to properly
+                // build the insert statement in using().
+                KSMetaData ksm = KSMetaData.newKeyspace(this.schema.ksName,
+                                                        AbstractReplicationStrategy.getClass("org.apache.cassandra.locator.SimpleStrategy"),
+                                                        ImmutableMap.of("replication_factor", "1"),
+                                                        true,
+                                                        Collections.singleton(this.schema));
+
+                Schema.instance.load(ksm);
+                return this;
+            }
+            catch (RequestValidationException e)
+            {
+                throw new IllegalArgumentException(e.getMessage(), e);
+            }
+        }
+
+        /**
+         * The partitioner to use.
+         * <p>
+         * By default, {@code Murmur3Partitioner} will be used. If this is not the partitioner used
+         * by the cluster for which the SSTables are created, you need to use this method to
+         * provide the correct partitioner.
+         *
+         * @param partitioner the partitioner to use.
+         * @return this builder.
+         */
+        public Builder withPartitioner(IPartitioner partitioner)
+        {
+            this.partitioner = partitioner;
+            return this;
+        }
+
+        /**
+         * The INSERT statement defining the order of the values to add for a given CQL row.
+         * <p>
+         * Please note that the provided INSERT statement <b>must</b> use a fully-qualified
+         * table name, one that include the keyspace name. Morewover, said statement must use
+         * bind variables since it is those bind variables that will be bound to values by the
+         * resulting writer.
+         * <p>
+         * This is a mandatory option, and this needs to be called after foTable().
+         *
+         * @param insertStatement an insertion statement that defines the order
+         * of column values to use.
+         * @return this builder.
+         *
+         * @throws IllegalArgumentException if {@code insertStatement} is not a valid insertion
+         * statement, does not have a fully-qualified table name or have no bind variables.
+         */
+        public Builder using(String insertStatement)
+        {
+            if (schema == null)
+                throw new IllegalStateException("You need to define the schema by calling forTable() prior to this call.");
+
+            Pair<UpdateStatement, List<ColumnSpecification>> p = getStatement(insertStatement, UpdateStatement.class, "INSERT");
+            this.insert = p.left;
+            this.boundNames = p.right;
+            if (this.insert.hasConditions())
+                throw new IllegalArgumentException("Conditional statements are not supported");
+            if (this.boundNames.isEmpty())
+                throw new IllegalArgumentException("Provided insert statement has no bind variables");
+            return this;
+        }
+
+        /**
+         * The size of the buffer to use.
+         * <p>
+         * This defines how much data will be buffered before being written as
+         * a new SSTable. This correspond roughly to the data size that will have the created
+         * sstable.
+         * <p>
+         * The default is 128MB, which should be reasonable for a 1GB heap. If you experience
+         * OOM while using the writer, you should lower this value.
+         *
+         * @param size the size to use in MB.
+         * @return this builder.
+         */
+        public Builder withBufferSizeInMB(int size)
+        {
+            this.bufferSizeInMB = size;
+            return this;
+        }
+
+        /**
+         * Creates a CQLSSTableWriter that expects sorted inputs.
+         * <p>
+         * If this option is used, the resulting writer will expect rows to be
+         * added in SSTable sorted order (and an exception will be thrown if that
+         * is not the case during insertion). The SSTable sorted order means that
+         * rows are added such that their partition key respect the partitioner
+         * order and for a given partition, that the rows respect the clustering
+         * columns order.
+         * <p>
+         * You should thus only use this option is you know that you can provide
+         * the rows in order, which is rarely the case. If you can provide the
+         * rows in order however, using this sorted might be more efficient.
+         * <p>
+         * Note that if used, some option like withBufferSizeInMB will be ignored.
+         *
+         * @return this builder.
+         */
+        public Builder sorted()
+        {
+            this.sorted = true;
+            return this;
+        }
+
+        private static <T extends CQLStatement> Pair<T, List<ColumnSpecification>> getStatement(String query, Class<T> klass, String type)
+        {
+            try
+            {
+                ClientState state = ClientState.forInternalCalls();
+                ParsedStatement.Prepared prepared = QueryProcessor.getStatement(query, state);
+                CQLStatement stmt = prepared.statement;
+                stmt.validate(state);
+
+                if (!stmt.getClass().equals(klass))
+                    throw new IllegalArgumentException("Invalid query, must be a " + type + " statement");
+
+                return Pair.create(klass.cast(stmt), prepared.boundNames);
+            }
+            catch (RequestValidationException e)
+            {
+                throw new IllegalArgumentException(e.getMessage(), e);
+            }
+        }
+
+        public CQLSSTableWriter build()
+        {
+            if (directory == null)
+                throw new IllegalStateException("No ouptut directory specified, you should provide a directory with inDirectory()");
+            if (schema == null)
+                throw new IllegalStateException("Missing schema, you should provide the schema for the SSTable to create with forTable()");
+            if (insert == null)
+                throw new IllegalStateException("No insert statement specified, you should provide an insert statement through using()");
+
+            AbstractSSTableSimpleWriter writer;
+            if (sorted)
+            {
+                writer = new SSTableSimpleWriter(directory,
+                                                 schema,
+                                                 partitioner);
+            }
+            else
+            {
+                writer = new SSTableSimpleUnsortedWriter(directory,
+                                                         schema,
+                                                         partitioner,
+                                                         bufferSizeInMB);
+            }
+            return new CQLSSTableWriter(writer, insert, boundNames);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index 52e5a03..6b39024 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -193,7 +193,7 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
                     writer = getWriter();
                     for (Map.Entry<DecoratedKey, ColumnFamily> entry : b.entrySet())
                         writer.append(entry.getKey(), entry.getValue());
-                    writer.closeAndOpenReader();
+                    writer.close();
                 }
             }
             catch (Throwable e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
index f0b45b5..9b584f0 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
@@ -72,7 +72,7 @@ public class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
         {
             if (currentKey != null)
                 writeRow(currentKey, columnFamily);
-            writer.closeAndOpenReader();
+            writer.close();
         }
         catch (FSError e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 5b3abfc..ac598bd 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FilterFactory;
 import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.StreamingHistogram;
 
 public class SSTableWriter extends SSTable
@@ -307,20 +308,9 @@ public class SSTableWriter extends SSTable
 
     public SSTableReader closeAndOpenReader(long maxDataAge)
     {
-        // index and filter
-        iwriter.close();
-        // main data, close will truncate if necessary
-        dataFile.close();
-        // write sstable statistics
-        SSTableMetadata sstableMetadata = sstableMetadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
-                                                                                    metadata.getBloomFilterFpChance());
-        writeMetadata(descriptor, sstableMetadata, sstableMetadataCollector.ancestors);
-
-        // save the table of components
-        SSTable.appendTOC(descriptor, components);
-
-        // remove the 'tmp' marker from all components
-        final Descriptor newdesc = rename(descriptor, components);
+        Pair<Descriptor, SSTableMetadata> p = close();
+        Descriptor newdesc = p.left;
+        SSTableMetadata sstableMetadata = p.right;
 
         // finalize in-memory state for the reader
         SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(SSTable.COMPONENT_INDEX));
@@ -344,6 +334,25 @@ public class SSTableWriter extends SSTable
         return sstable;
     }
 
+    // Close the writer and return the descriptor to the new sstable and it's metadata
+    public Pair<Descriptor, SSTableMetadata> close()
+    {
+        // index and filter
+        iwriter.close();
+        // main data, close will truncate if necessary
+        dataFile.close();
+        // write sstable statistics
+        SSTableMetadata sstableMetadata = sstableMetadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
+                                                                                    metadata.getBloomFilterFpChance());
+        writeMetadata(descriptor, sstableMetadata, sstableMetadataCollector.ancestors);
+
+        // save the table of components
+        SSTable.appendTOC(descriptor, components);
+
+        // remove the 'tmp' marker from all components
+        return Pair.create(rename(descriptor, components), sstableMetadata);
+    }
+
     private static void writeMetadata(Descriptor desc, SSTableMetadata sstableMetadata,  Set<Integer> ancestors)
     {
         SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(SSTable.COMPONENT_STATS)), true);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index 55cd329..f3cc38f 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -107,7 +107,7 @@ public class SchemaLoader
         String ks_rcs = "RowCacheSpace";
         String ks_nocommit = "NoCommitlogSpace";
         String ks_prsi = "PerRowSecondaryIndex";
-
+        String ks_cql = "cql_keyspace";
 
         Class<? extends AbstractReplicationStrategy> simple = SimpleStrategy.class;
 
@@ -290,6 +290,18 @@ public class SchemaLoader
                                            opts_rf1,
                                            perRowIndexedCFMD(ks_prsi, "Indexed1", withOldCfIds)));
 
+        // CQLKeyspace
+        schema.add(KSMetaData.testMetadata(ks_cql,
+                                           simple,
+                                           opts_rf1,
+
+                                           // Column Families
+                                           CFMetaData.compile("CREATE TABLE table1 ("
+                                                              + "k int PRIMARY KEY,"
+                                                              + "v1 text,"
+                                                              + "v2 int"
+                                                              + ")", ks_cql)));
+
 
         if (Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false")))
             useCompression(schema);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
new file mode 100644
index 0000000..7095b35
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.util.Iterator;
+
+import com.google.common.io.Files;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.OutputHandler;
+
+public class CQLSSTableWriterTest extends SchemaLoader
+{
+    @BeforeClass
+    public static void setup() throws Exception
+    {
+        StorageService.instance.initServer();
+    }
+
+    @Test
+    public void testUnsortedWriter() throws Exception
+    {
+        String KS = "cql_keyspace";
+        String TABLE = "table1";
+
+        File tempdir = Files.createTempDir();
+        File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
+        assert dataDir.mkdirs();
+
+        String schema = "CREATE TABLE cql_keyspace.table1 ("
+                      + "  k int PRIMARY KEY,"
+                      + "  v1 text,"
+                      + "  v2 int"
+                      + ")";
+        String insert = "INSERT INTO cql_keyspace.table1 (k, v1, v2) VALUES (?, ?, ?)";
+        CQLSSTableWriter writer = CQLSSTableWriter.builder()
+                                                  .inDirectory(dataDir)
+                                                  .forTable(schema)
+                                                  .withPartitioner(StorageService.instance.getPartitioner())
+                                                  .using(insert).build();
+
+        writer.addRow(0, "test1", 24);
+        writer.addRow(1, "test2", null);
+        writer.addRow(2, "test3", 42);
+        writer.close();
+
+        SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client()
+        {
+            public void init(String keyspace)
+            {
+                for (Range<Token> range : StorageService.instance.getLocalRanges("Keyspace1"))
+                    addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
+                setPartitioner(StorageService.getPartitioner());
+            }
+
+            public CFMetaData getCFMetaData(String keyspace, String cfName)
+            {
+                return Schema.instance.getCFMetaData(keyspace, cfName);
+            }
+        }, new OutputHandler.SystemOutput(false, false));
+
+        loader.stream().get();
+
+        UntypedResultSet rs = QueryProcessor.processInternal("SELECT * FROM cql_keyspace.table1;");
+        assertEquals(3, rs.size());
+
+        Iterator<UntypedResultSet.Row> iter = rs.iterator();
+        UntypedResultSet.Row row;
+
+        row = iter.next();
+        assertEquals(0, row.getInt("k"));
+        assertEquals("test1", row.getString("v1"));
+        assertEquals(24, row.getInt("v2"));
+
+        row = iter.next();
+        assertEquals(1, row.getInt("k"));
+        assertEquals("test2", row.getString("v1"));
+        assertFalse(row.has("v2"));
+
+        row = iter.next();
+        assertEquals(2, row.getInt("k"));
+        assertEquals("test3", row.getString("v1"));
+        assertEquals(42, row.getInt("v2"));
+    }
+}