You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/10/29 11:11:58 UTC
[2/2] git commit: Add CQL-aware SSTableWriter
Add CQL-aware SSTableWriter
patch by slebresne; reviewed by jbellis for CASSANDRA-5894
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/97cbf6ad
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/97cbf6ad
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/97cbf6ad
Branch: refs/heads/cassandra-2.0
Commit: 97cbf6ad33af1fe3d16912bd3280b53bf9f22bb2
Parents: a999b15
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Oct 29 11:11:05 2013 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Oct 29 11:11:05 2013 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 10 +
.../org/apache/cassandra/config/CFMetaData.java | 3 +-
.../apache/cassandra/cql3/QueryProcessor.java | 2 +-
.../cql3/statements/ModificationStatement.java | 4 +-
.../cql3/statements/UpdateStatement.java | 9 +-
.../io/sstable/AbstractSSTableSimpleWriter.java | 19 +
.../cassandra/io/sstable/CQLSSTableWriter.java | 476 +++++++++++++++++++
.../io/sstable/SSTableSimpleUnsortedWriter.java | 2 +-
.../io/sstable/SSTableSimpleWriter.java | 2 +-
.../cassandra/io/sstable/SSTableWriter.java | 37 +-
.../unit/org/apache/cassandra/SchemaLoader.java | 14 +-
.../io/sstable/CQLSSTableWriterTest.java | 115 +++++
13 files changed, 671 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0d60fae..7bf7f21 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@
* Update memtable size while flushing (CASSANDRA-6249)
* Provide hooks around CQL2/CQL3 statement execution (CASSANDRA-6252)
* Require Permission.SELECT for CAS updates (CASSANDRA-6247)
+ * New CQL-aware SSTableWriter (CASSANDRA-5894)
Merged from 1.2:
* Require logging in for Thrift CQL2/3 statement preparation (CASSANDRA-6254)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 69ab4fd..2489f26 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -19,6 +19,16 @@ using the provided 'sstableupgrade' tool.
New features
------------
+ - A new CQLSSTableWriter class has been added. It is the equivalent of
+ the existing SSTableSimpleWriter/SSTableSimpleUnsortedWriter but is
+ CQL oriented.
+
+
+2.0.2
+=====
+
+New features
+------------
- Speculative retry defaults to 99th percentile
(See blog post at http://www.datastax.com/dev/blog/rapid-read-protection-in-cassandra-2-0-2)
- Configurable metrics reporting
http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 20c35b3..f53c60c 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -476,7 +476,8 @@ public final class CFMetaData
return compile(cql, Keyspace.SYSTEM_KS);
}
- private static CFMetaData compile(String cql, String keyspace)
+ @VisibleForTesting
+ public static CFMetaData compile(String cql, String keyspace)
{
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index ec8b379..dc2649c 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -365,7 +365,7 @@ public class QueryProcessor
hook.processBatch(batch, context);
}
- private static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState)
+ public static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState)
throws RequestValidationException
{
Tracing.trace("Parsing {}", queryStr);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 70bafb4..0f425b8 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -205,7 +205,7 @@ public abstract class ModificationStatement implements CQLStatement
}
}
- private List<ByteBuffer> buildPartitionKeyNames(List<ByteBuffer> variables)
+ public List<ByteBuffer> buildPartitionKeyNames(List<ByteBuffer> variables)
throws InvalidRequestException
{
CFDefinition cfDef = cfm.getCfDef();
@@ -241,7 +241,7 @@ public abstract class ModificationStatement implements CQLStatement
return keys;
}
- private ColumnNameBuilder createClusteringPrefixBuilder(List<ByteBuffer> variables)
+ public ColumnNameBuilder createClusteringPrefixBuilder(List<ByteBuffer> variables)
throws InvalidRequestException
{
CFDefinition cfDef = cfm.getCfDef();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 89f17a7..12348df 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -45,11 +45,10 @@ public class UpdateStatement extends ModificationStatement
return true;
}
- public ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
+ public void addUpdateForKey(ColumnFamily cf, ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
throws InvalidRequestException
{
CFDefinition cfDef = cfm.getCfDef();
- ColumnFamily cf = UnsortedColumns.factory.create(cfm);
// Inserting the CQL row marker (see #4361)
// We always need to insert a marker, because of the following situation:
@@ -97,7 +96,13 @@ public class UpdateStatement extends ModificationStatement
for (Operation update : updates)
update.execute(key, cf, builder.copy(), params);
}
+ }
+ public ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
+ throws InvalidRequestException
+ {
+ ColumnFamily cf = UnsortedColumns.factory.create(cfm);
+ addUpdateForKey(cf, key, builder, params);
return cf;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 23f5c85..0059fda 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -164,6 +164,25 @@ public abstract class AbstractSSTableSimpleWriter
*/
public abstract void close() throws IOException;
+ /**
+ * Package protected for use by AbstractCQLSSTableWriter.
+ * Not meant to be exposed publicly.
+ */
+ ColumnFamily currentColumnFamily()
+ {
+ return columnFamily;
+ }
+
+ /**
+ * Package protected for use by AbstractCQLSSTableWriter.
+ * Not meant to be exposed publicly.
+ */
+ DecoratedKey currentKey()
+ {
+ return currentKey;
+ }
+
+
protected abstract void writeRow(DecoratedKey key, ColumnFamily columnFamily) throws IOException;
protected abstract ColumnFamily getColumnFamily();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
new file mode 100644
index 0000000..86348aa
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.cql3.statements.*;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.config.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.io.compress.CompressionParameters;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * Utility to write SSTables.
+ * <p>
+ * Typical usage looks like:
+ * <pre>
+ * String schema = "CREATE TABLE myKs.myTable ("
+ * + " k int PRIMARY KEY,"
+ * + " v1 text,"
+ * + " v2 int"
+ * + ")";
+ * String insert = "INSERT INTO myKs.myTable (k, v1, v2) VALUES (?, ?, ?)";
+ *
+ * // Creates a new writer. You need to provide at least the directory where to write the created sstable,
+ * // the schema for the sstable to write and a (prepared) insert statement to use. If you do not use the
+ * // default partitioner (Murmur3Partitioner), you will also need to provide the partitioner in use, see
+ * // CQLSSTableWriter.Builder for more details on the available options.
+ * CQLSSTableWriter writer = CQLSSTableWriter.builder()
+ * .inDirectory("path/to/directory")
+ * .forTable(schema)
+ * .using(insert).build();
+ *
+ * // Adds a nember of rows to the resulting sstable
+ * writer.addRow(0, "test1", 24);
+ * writer.addRow(1, "test2", null);
+ * writer.addRow(2, "test3", 42);
+ *
+ * // Close the writer, finalizing the sstable
+ * writer.close();
+ * </pre>
+ */
+public class CQLSSTableWriter
+{
+ private final AbstractSSTableSimpleWriter writer;
+ private final UpdateStatement insert;
+ private final List<ColumnSpecification> boundNames;
+
+ private CQLSSTableWriter(AbstractSSTableSimpleWriter writer, UpdateStatement insert, List<ColumnSpecification> boundNames)
+ {
+ this.writer = writer;
+ this.insert = insert;
+ this.boundNames = boundNames;
+ }
+
+ /**
+ * Returns a new builder for a CQLSSTableWriter.
+ *
+ * @return the new builder.
+ */
+ public static Builder builder()
+ {
+ return new Builder();
+ }
+
+ /**
+ * Adds a new row to the writer.
+ * <p>
+ * This is a shortcut for {@code addRow(Arrays.asList(values))}.
+ *
+ * @param values the row values (corresponding to the bind variables of the
+ * insertion statement used when creating by this writer).
+ * @return this writer.
+ */
+ public CQLSSTableWriter addRow(Object... values)
+ throws InvalidRequestException, IOException
+ {
+ return addRow(Arrays.asList(values));
+ }
+
+ /**
+ * Adds a new row to the writer.
+ * <p>
+ * Each provided value type should correspond to the types of the CQL column
+ * the value is for. The correspondance between java type and CQL type is the
+ * same one than the one documented at
+ * www.datastax.com/drivers/java/2.0/apidocs/com/datastax/driver/core/DataType.Name.html#asJavaClass().
+ * <p>
+ * If you prefer providing the values directly as binary, use
+ * {@link #rawAddRow} instead.
+ *
+ * @param values the row values (corresponding to the bind variables of the
+ * insertion statement used when creating by this writer).
+ * @return this writer.
+ */
+ public CQLSSTableWriter addRow(List<Object> values)
+ throws InvalidRequestException, IOException
+ {
+ int size = Math.min(values.size(), boundNames.size());
+ List<ByteBuffer> rawValues = new ArrayList<>(size);
+ for (int i = 0; i < size; i++)
+ rawValues.add(values.get(i) == null ? null : ((AbstractType)boundNames.get(i).type).decompose(values.get(i)));
+ return rawAddRow(rawValues);
+ }
+
+ /**
+ * Adds a new row to the writer.
+ * <p>
+ * This is equivalent to the other addRow methods, but takes a map whose
+ * keys are the names of the columns to add instead of taking a list of the
+ * values in the order of the insert statement used during construction of
+ * this write.
+ *
+ * @param values a map of colum name to column values representing the new
+ * row to add. Note that if a column is not part of the map, it's value will
+ * be {@code null}. If the map contains keys that does not correspond to one
+ * of the column of the insert statement used when creating this writer, the
+ * the corresponding value is ignored.
+ * @return this writer.
+ */
+ public CQLSSTableWriter addRow(Map<String, Object> values)
+ throws InvalidRequestException, IOException
+ {
+ int size = Math.min(values.size(), boundNames.size());
+ List<ByteBuffer> rawValues = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ ColumnSpecification spec = boundNames.get(i);
+ rawValues.add(((AbstractType)spec.type).decompose(values.get(spec.name.toString())));
+ }
+ return rawAddRow(rawValues);
+ }
+
+ /**
+ * Adds a new row to the writer given already serialized values.
+ *
+ * @param values the row values (corresponding to the bind variables of the
+ * insertion statement used when creating by this writer) as binary.
+ * @return this writer.
+ */
+ public CQLSSTableWriter rawAddRow(ByteBuffer... values)
+ throws InvalidRequestException, IOException
+ {
+ return rawAddRow(Arrays.asList(values));
+ }
+
+ /**
+ * Adds a new row to the writer given already serialized values.
+ * <p>
+ * This is a shortcut for {@code rawAddRow(Arrays.asList(values))}.
+ *
+ * @param values the row values (corresponding to the bind variables of the
+ * insertion statement used when creating by this writer) as binary.
+ * @return this writer.
+ */
+ public CQLSSTableWriter rawAddRow(List<ByteBuffer> values)
+ throws InvalidRequestException, IOException
+ {
+ if (values.size() != boundNames.size())
+ throw new InvalidRequestException(String.format("Invalid number of arguments, expecting %d values but got %d", boundNames.size(), values.size()));
+
+ List<ByteBuffer> keys = insert.buildPartitionKeyNames(values);
+ ColumnNameBuilder clusteringPrefix = insert.createClusteringPrefixBuilder(values);
+
+ long now = System.currentTimeMillis() * 1000;
+ UpdateParameters params = new UpdateParameters(insert.cfm,
+ values,
+ insert.getTimestamp(now, values),
+ insert.getTimeToLive(values),
+ Collections.<ByteBuffer, ColumnGroupMap>emptyMap());
+
+ for (ByteBuffer key: keys)
+ {
+ if (writer.currentKey() == null || !key.equals(writer.currentKey().key))
+ writer.newRow(key);
+ insert.addUpdateForKey(writer.currentColumnFamily(), key, clusteringPrefix, params);
+ }
+ return this;
+ }
+
+ /**
+ * Adds a new row to the writer given already serialized values.
+ * <p>
+ * This is equivalent to the other rawAddRow methods, but takes a map whose
+ * keys are the names of the columns to add instead of taking a list of the
+ * values in the order of the insert statement used during construction of
+ * this write.
+ *
+ * @param values a map of colum name to column values representing the new
+ * row to add. Note that if a column is not part of the map, it's value will
+ * be {@code null}. If the map contains keys that does not correspond to one
+ * of the column of the insert statement used when creating this writer, the
+ * the corresponding value is ignored.
+ * @return this writer.
+ */
+ public CQLSSTableWriter rawAddRow(Map<String, ByteBuffer> values)
+ throws InvalidRequestException, IOException
+ {
+ int size = Math.min(values.size(), boundNames.size());
+ List<ByteBuffer> rawValues = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ ColumnSpecification spec = boundNames.get(i);
+ rawValues.add(values.get(spec.name.toString()));
+ }
+ return rawAddRow(rawValues);
+ }
+
+ /**
+ * Close this writer.
+ * <p>
+ * This method should be called, otherwise the produced sstables are not
+ * guaranteed to be complete (and won't be in practice).
+ */
+ public void close() throws IOException
+ {
+ writer.close();
+ }
+
+ /**
+ * A Builder for a CQLSSTableWriter object.
+ */
+ public static class Builder
+ {
+ private File directory;
+ private IPartitioner partitioner = new Murmur3Partitioner();
+
+ private CFMetaData schema;
+ private UpdateStatement insert;
+ private List<ColumnSpecification> boundNames;
+
+ private boolean sorted = false;
+ private long bufferSizeInMB = 128;
+
+ private Builder() {}
+
+ /**
+ * The directory where to write the sstables.
+ * <p>
+ * This is a mandatory option.
+ *
+ * @param directory the directory to use, which should exists and be writable.
+ * @return this builder.
+ *
+ * @throws IllegalArgumentException if {@code directory} doesn't exist or is not writable.
+ */
+ public Builder inDirectory(String directory)
+ {
+ return inDirectory(new File(directory));
+ }
+
+ /**
+ * The directory where to write the sstables (mandatory option).
+ * <p>
+ * This is a mandatory option.
+ *
+ * @param directory the directory to use, which should exists and be writable.
+ * @return this builder.
+ *
+ * @throws IllegalArgumentException if {@code directory} doesn't exist or is not writable.
+ */
+ public Builder inDirectory(File directory)
+ {
+ if (!directory.exists())
+ throw new IllegalArgumentException(directory + " doesn't exists");
+ if (!directory.canWrite())
+ throw new IllegalArgumentException(directory + " exists but is not writable");
+
+ this.directory = directory;
+ return this;
+ }
+
+ /**
+ * The schema (CREATE TABLE statement) for the table for which sstable are to be created.
+ * <p>
+ * Please note that the provided CREATE TABLE statement <b>must</b> use a fully-qualified
+ * table name, one that include the keyspace name.
+ * <p>
+ * This is a mandatory option.
+ *
+ * @param schema the schema of the table for which sstables are to be created.
+ * @return this builder.
+ *
+ * @throws IllegalArgumentException if {@code schema} is not a valid CREATE TABLE statement
+ * or does not have a fully-qualified table name.
+ */
+ public Builder forTable(String schema)
+ {
+ try
+ {
+ this.schema = getStatement(schema, CreateTableStatement.class, "CREATE TABLE").left.getCFMetaData().rebuild();
+
+ // We need to register the keyspace/table metadata through Schema, otherwise we won't be able to properly
+ // build the insert statement in using().
+ KSMetaData ksm = KSMetaData.newKeyspace(this.schema.ksName,
+ AbstractReplicationStrategy.getClass("org.apache.cassandra.locator.SimpleStrategy"),
+ ImmutableMap.of("replication_factor", "1"),
+ true,
+ Collections.singleton(this.schema));
+
+ Schema.instance.load(ksm);
+ return this;
+ }
+ catch (RequestValidationException e)
+ {
+ throw new IllegalArgumentException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * The partitioner to use.
+ * <p>
+ * By default, {@code Murmur3Partitioner} will be used. If this is not the partitioner used
+ * by the cluster for which the SSTables are created, you need to use this method to
+ * provide the correct partitioner.
+ *
+ * @param partitioner the partitioner to use.
+ * @return this builder.
+ */
+ public Builder withPartitioner(IPartitioner partitioner)
+ {
+ this.partitioner = partitioner;
+ return this;
+ }
+
+ /**
+ * The INSERT statement defining the order of the values to add for a given CQL row.
+ * <p>
+ * Please note that the provided INSERT statement <b>must</b> use a fully-qualified
+ * table name, one that include the keyspace name. Morewover, said statement must use
+ * bind variables since it is those bind variables that will be bound to values by the
+ * resulting writer.
+ * <p>
+ * This is a mandatory option, and this needs to be called after foTable().
+ *
+ * @param insertStatement an insertion statement that defines the order
+ * of column values to use.
+ * @return this builder.
+ *
+ * @throws IllegalArgumentException if {@code insertStatement} is not a valid insertion
+ * statement, does not have a fully-qualified table name or have no bind variables.
+ */
+ public Builder using(String insertStatement)
+ {
+ if (schema == null)
+ throw new IllegalStateException("You need to define the schema by calling forTable() prior to this call.");
+
+ Pair<UpdateStatement, List<ColumnSpecification>> p = getStatement(insertStatement, UpdateStatement.class, "INSERT");
+ this.insert = p.left;
+ this.boundNames = p.right;
+ if (this.insert.hasConditions())
+ throw new IllegalArgumentException("Conditional statements are not supported");
+ if (this.boundNames.isEmpty())
+ throw new IllegalArgumentException("Provided insert statement has no bind variables");
+ return this;
+ }
+
+ /**
+ * The size of the buffer to use.
+ * <p>
+ * This defines how much data will be buffered before being written as
+ * a new SSTable. This correspond roughly to the data size that will have the created
+ * sstable.
+ * <p>
+ * The default is 128MB, which should be reasonable for a 1GB heap. If you experience
+ * OOM while using the writer, you should lower this value.
+ *
+ * @param size the size to use in MB.
+ * @return this builder.
+ */
+ public Builder withBufferSizeInMB(int size)
+ {
+ this.bufferSizeInMB = size;
+ return this;
+ }
+
+ /**
+ * Creates a CQLSSTableWriter that expects sorted inputs.
+ * <p>
+ * If this option is used, the resulting writer will expect rows to be
+ * added in SSTable sorted order (and an exception will be thrown if that
+ * is not the case during insertion). The SSTable sorted order means that
+ * rows are added such that their partition key respect the partitioner
+ * order and for a given partition, that the rows respect the clustering
+ * columns order.
+ * <p>
+ * You should thus only use this option is you know that you can provide
+ * the rows in order, which is rarely the case. If you can provide the
+ * rows in order however, using this sorted might be more efficient.
+ * <p>
+ * Note that if used, some option like withBufferSizeInMB will be ignored.
+ *
+ * @return this builder.
+ */
+ public Builder sorted()
+ {
+ this.sorted = true;
+ return this;
+ }
+
+ private static <T extends CQLStatement> Pair<T, List<ColumnSpecification>> getStatement(String query, Class<T> klass, String type)
+ {
+ try
+ {
+ ClientState state = ClientState.forInternalCalls();
+ ParsedStatement.Prepared prepared = QueryProcessor.getStatement(query, state);
+ CQLStatement stmt = prepared.statement;
+ stmt.validate(state);
+
+ if (!stmt.getClass().equals(klass))
+ throw new IllegalArgumentException("Invalid query, must be a " + type + " statement");
+
+ return Pair.create(klass.cast(stmt), prepared.boundNames);
+ }
+ catch (RequestValidationException e)
+ {
+ throw new IllegalArgumentException(e.getMessage(), e);
+ }
+ }
+
+ public CQLSSTableWriter build()
+ {
+ if (directory == null)
+ throw new IllegalStateException("No ouptut directory specified, you should provide a directory with inDirectory()");
+ if (schema == null)
+ throw new IllegalStateException("Missing schema, you should provide the schema for the SSTable to create with forTable()");
+ if (insert == null)
+ throw new IllegalStateException("No insert statement specified, you should provide an insert statement through using()");
+
+ AbstractSSTableSimpleWriter writer;
+ if (sorted)
+ {
+ writer = new SSTableSimpleWriter(directory,
+ schema,
+ partitioner);
+ }
+ else
+ {
+ writer = new SSTableSimpleUnsortedWriter(directory,
+ schema,
+ partitioner,
+ bufferSizeInMB);
+ }
+ return new CQLSSTableWriter(writer, insert, boundNames);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index 52e5a03..6b39024 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -193,7 +193,7 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
writer = getWriter();
for (Map.Entry<DecoratedKey, ColumnFamily> entry : b.entrySet())
writer.append(entry.getKey(), entry.getValue());
- writer.closeAndOpenReader();
+ writer.close();
}
}
catch (Throwable e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
index f0b45b5..9b584f0 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
@@ -72,7 +72,7 @@ public class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
{
if (currentKey != null)
writeRow(currentKey, columnFamily);
- writer.closeAndOpenReader();
+ writer.close();
}
catch (FSError e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 5b3abfc..ac598bd 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FilterFactory;
import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.StreamingHistogram;
public class SSTableWriter extends SSTable
@@ -307,20 +308,9 @@ public class SSTableWriter extends SSTable
public SSTableReader closeAndOpenReader(long maxDataAge)
{
- // index and filter
- iwriter.close();
- // main data, close will truncate if necessary
- dataFile.close();
- // write sstable statistics
- SSTableMetadata sstableMetadata = sstableMetadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
- metadata.getBloomFilterFpChance());
- writeMetadata(descriptor, sstableMetadata, sstableMetadataCollector.ancestors);
-
- // save the table of components
- SSTable.appendTOC(descriptor, components);
-
- // remove the 'tmp' marker from all components
- final Descriptor newdesc = rename(descriptor, components);
+ Pair<Descriptor, SSTableMetadata> p = close();
+ Descriptor newdesc = p.left;
+ SSTableMetadata sstableMetadata = p.right;
// finalize in-memory state for the reader
SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(SSTable.COMPONENT_INDEX));
@@ -344,6 +334,25 @@ public class SSTableWriter extends SSTable
return sstable;
}
+ // Close the writer and return the descriptor to the new sstable and it's metadata
+ public Pair<Descriptor, SSTableMetadata> close()
+ {
+ // index and filter
+ iwriter.close();
+ // main data, close will truncate if necessary
+ dataFile.close();
+ // write sstable statistics
+ SSTableMetadata sstableMetadata = sstableMetadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
+ metadata.getBloomFilterFpChance());
+ writeMetadata(descriptor, sstableMetadata, sstableMetadataCollector.ancestors);
+
+ // save the table of components
+ SSTable.appendTOC(descriptor, components);
+
+ // remove the 'tmp' marker from all components
+ return Pair.create(rename(descriptor, components), sstableMetadata);
+ }
+
private static void writeMetadata(Descriptor desc, SSTableMetadata sstableMetadata, Set<Integer> ancestors)
{
SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(SSTable.COMPONENT_STATS)), true);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index 55cd329..f3cc38f 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -107,7 +107,7 @@ public class SchemaLoader
String ks_rcs = "RowCacheSpace";
String ks_nocommit = "NoCommitlogSpace";
String ks_prsi = "PerRowSecondaryIndex";
-
+ String ks_cql = "cql_keyspace";
Class<? extends AbstractReplicationStrategy> simple = SimpleStrategy.class;
@@ -290,6 +290,18 @@ public class SchemaLoader
opts_rf1,
perRowIndexedCFMD(ks_prsi, "Indexed1", withOldCfIds)));
+ // CQLKeyspace
+ schema.add(KSMetaData.testMetadata(ks_cql,
+ simple,
+ opts_rf1,
+
+ // Column Families
+ CFMetaData.compile("CREATE TABLE table1 ("
+ + "k int PRIMARY KEY,"
+ + "v1 text,"
+ + "v2 int"
+ + ")", ks_cql)));
+
if (Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false")))
useCompression(schema);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/97cbf6ad/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
new file mode 100644
index 0000000..7095b35
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.util.Iterator;
+
+import com.google.common.io.Files;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.OutputHandler;
+
+public class CQLSSTableWriterTest extends SchemaLoader
+{
+ @BeforeClass
+ public static void setup() throws Exception
+ {
+ StorageService.instance.initServer();
+ }
+
+ @Test
+ public void testUnsortedWriter() throws Exception
+ {
+ String KS = "cql_keyspace";
+ String TABLE = "table1";
+
+ File tempdir = Files.createTempDir();
+ File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
+ assert dataDir.mkdirs();
+
+ String schema = "CREATE TABLE cql_keyspace.table1 ("
+ + " k int PRIMARY KEY,"
+ + " v1 text,"
+ + " v2 int"
+ + ")";
+ String insert = "INSERT INTO cql_keyspace.table1 (k, v1, v2) VALUES (?, ?, ?)";
+ CQLSSTableWriter writer = CQLSSTableWriter.builder()
+ .inDirectory(dataDir)
+ .forTable(schema)
+ .withPartitioner(StorageService.instance.getPartitioner())
+ .using(insert).build();
+
+ writer.addRow(0, "test1", 24);
+ writer.addRow(1, "test2", null);
+ writer.addRow(2, "test3", 42);
+ writer.close();
+
+ SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client()
+ {
+ public void init(String keyspace)
+ {
+ for (Range<Token> range : StorageService.instance.getLocalRanges("Keyspace1"))
+ addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
+ setPartitioner(StorageService.getPartitioner());
+ }
+
+ public CFMetaData getCFMetaData(String keyspace, String cfName)
+ {
+ return Schema.instance.getCFMetaData(keyspace, cfName);
+ }
+ }, new OutputHandler.SystemOutput(false, false));
+
+ loader.stream().get();
+
+ UntypedResultSet rs = QueryProcessor.processInternal("SELECT * FROM cql_keyspace.table1;");
+ assertEquals(3, rs.size());
+
+ Iterator<UntypedResultSet.Row> iter = rs.iterator();
+ UntypedResultSet.Row row;
+
+ row = iter.next();
+ assertEquals(0, row.getInt("k"));
+ assertEquals("test1", row.getString("v1"));
+ assertEquals(24, row.getInt("v2"));
+
+ row = iter.next();
+ assertEquals(1, row.getInt("k"));
+ assertEquals("test2", row.getString("v1"));
+ assertFalse(row.has("v2"));
+
+ row = iter.next();
+ assertEquals(2, row.getInt("k"));
+ assertEquals("test3", row.getString("v1"));
+ assertEquals(42, row.getInt("v2"));
+ }
+}