You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2016/09/12 15:49:20 UTC
cassandra git commit: Put CQLSSTableWriter back to the old
interface/behavior before CASSANDRA-11844
Repository: cassandra
Updated Branches:
refs/heads/trunk 3f49c328f -> 0026e4eee
Put CQLSSTableWriter back to the old interface/behavior before CASSANDRA-11844
Patch by Jeremiah Jordan; reviewed by Jake Luciani for CASSANDRA-12551
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0026e4ee
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0026e4ee
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0026e4ee
Branch: refs/heads/trunk
Commit: 0026e4eeec23367c74c44b23a9586562b939f6f8
Parents: 3f49c32
Author: Jeremiah D Jordan <je...@datastax.com>
Authored: Fri Sep 2 10:06:39 2016 -0500
Committer: T Jake Luciani <ja...@apache.org>
Committed: Mon Sep 12 11:48:45 2016 -0400
----------------------------------------------------------------------
.../hadoop/cql3/CqlBulkRecordWriter.java | 4 +-
.../io/sstable/AbstractSSTableSimpleWriter.java | 29 +-
.../cassandra/io/sstable/CQLSSTableWriter.java | 148 ++--
.../cassandra/io/sstable/SSTableLoader.java | 16 +-
.../io/sstable/SSTableSimpleUnsortedWriter.java | 12 +-
.../io/sstable/SSTableSimpleWriter.java | 8 +-
.../cassandra/io/sstable/SSTableTxnWriter.java | 10 +-
.../cassandra/streaming/LongStreamingTest.java | 7 +-
.../db/lifecycle/RealTransactionsTest.java | 4 +-
.../io/sstable/CQLSSTableWriterClientTest.java | 16 +-
.../io/sstable/CQLSSTableWriterTest.java | 37 +-
.../cassandra/io/sstable/SSTableLoaderTest.java | 38 +-
.../io/sstable/StressCQLSSTableWriter.java | 672 +++++++++++++++++++
.../cassandra/stress/CompactionStress.java | 20 +-
.../apache/cassandra/stress/StressProfile.java | 2 +-
.../operations/userdefined/SchemaInsert.java | 20 +-
16 files changed, 813 insertions(+), 230 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
index bd157e9..2ed37ee 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
@@ -75,7 +75,7 @@ public class CqlBulkRecordWriter extends RecordWriter<Object, List<ByteBuffer>>
protected final Configuration conf;
protected final int maxFailures;
protected final int bufferSize;
- protected CQLSSTableWriter writer;
+ protected Closeable writer;
protected SSTableLoader loader;
protected Progressable progress;
protected TaskAttemptContext context;
@@ -174,7 +174,7 @@ public class CqlBulkRecordWriter extends RecordWriter<Object, List<ByteBuffer>>
ExternalClient externalClient = new ExternalClient(conf);
externalClient.setTableMetadata(CFMetaData.compile(schema, keyspace));
- loader = new SSTableLoader(writer.getInnermostDirectory(), externalClient, new NullOutputHandler())
+ loader = new SSTableLoader(outputDir, externalClient, new NullOutputHandler())
{
@Override
public void onSuccess(StreamState finalState)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index f989878..9a8f968 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -22,13 +22,15 @@ import java.io.FilenameFilter;
import java.io.IOException;
import java.io.Closeable;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.Pair;
@@ -38,17 +40,17 @@ import org.apache.cassandra.utils.Pair;
*/
abstract class AbstractSSTableSimpleWriter implements Closeable
{
- protected final ColumnFamilyStore cfs;
- protected final IPartitioner partitioner;
+ protected final File directory;
+ protected final CFMetaData metadata;
protected final PartitionColumns columns;
protected SSTableFormat.Type formatType = SSTableFormat.Type.current();
protected static AtomicInteger generation = new AtomicInteger(0);
protected boolean makeRangeAware = false;
- protected AbstractSSTableSimpleWriter(ColumnFamilyStore cfs, IPartitioner partitioner, PartitionColumns columns)
+ protected AbstractSSTableSimpleWriter(File directory, CFMetaData metadata, PartitionColumns columns)
{
- this.cfs = cfs;
- this.partitioner = partitioner;
+ this.metadata = metadata;
+ this.directory = directory;
this.columns = columns;
}
@@ -65,17 +67,18 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
protected SSTableTxnWriter createWriter()
{
- SerializationHeader header = new SerializationHeader(true, cfs.metadata, columns, EncodingStats.NO_STATS);
+ SerializationHeader header = new SerializationHeader(true, metadata, columns, EncodingStats.NO_STATS);
if (makeRangeAware)
- return SSTableTxnWriter.createRangeAware(cfs, 0, ActiveRepairService.UNREPAIRED_SSTABLE, formatType, 0, header);
+ return SSTableTxnWriter.createRangeAware(metadata, 0, ActiveRepairService.UNREPAIRED_SSTABLE, formatType, 0, header);
- return SSTableTxnWriter.create(cfs,
- createDescriptor(cfs.getDirectories().getDirectoryForNewSSTables(), cfs.metadata.ksName, cfs.metadata.cfName, formatType),
+ return SSTableTxnWriter.create(metadata,
+ createDescriptor(directory, metadata.ksName, metadata.cfName, formatType),
0,
ActiveRepairService.UNREPAIRED_SSTABLE,
0,
- header);
+ header,
+ Collections.emptySet());
}
private static Descriptor createDescriptor(File directory, final String keyspace, final String columnFamily, final SSTableFormat.Type fmt)
@@ -115,7 +118,7 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
PartitionUpdate getUpdateFor(ByteBuffer key) throws IOException
{
- return getUpdateFor(partitioner.decorateKey(key));
+ return getUpdateFor(metadata.decorateKey(key));
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index e668b75..8a9d01d 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -21,14 +21,17 @@ import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
import java.util.stream.Collectors;
import com.datastax.driver.core.ProtocolVersion;
import com.datastax.driver.core.TypeCodec;
-import com.sun.org.apache.xpath.internal.operations.Bool;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.ColumnSpecification;
@@ -40,7 +43,8 @@ import org.apache.cassandra.cql3.statements.CreateTableStatement;
import org.apache.cassandra.cql3.statements.CreateTypeStatement;
import org.apache.cassandra.cql3.statements.ParsedStatement;
import org.apache.cassandra.cql3.statements.UpdateStatement;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.marshal.UserType;
import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.dht.IPartitioner;
@@ -49,12 +53,10 @@ import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.RequestValidationException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
-import org.apache.cassandra.locator.SimpleSnitch;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.Types;
import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
@@ -331,30 +333,14 @@ public class CQLSSTableWriter implements Closeable
return codec.serialize(value, ProtocolVersion.NEWEST_SUPPORTED);
}
/**
- * The writer loads data in directories corresponding to how they laid out on the server.
- * <p>
- * {keyspace}/{table-cfid}/
- *
- * This method can be used to fetch the innermost directory with the sstable components
- * @return The directory containing the sstable components
- */
- public File getInnermostDirectory()
- {
- return writer.cfs.getDirectories().getDirectoryForNewSSTables();
- }
-
- /**
* A Builder for a CQLSSTableWriter object.
*/
public static class Builder
{
- private final List<File> directoryList;
- private ColumnFamilyStore cfs;
+ private File directory;
protected SSTableFormat.Type formatType = null;
- private Boolean makeRangeAware = false;
-
private CreateTableStatement.RawStatement schemaStatement;
private final List<CreateTypeStatement> typeStatements;
private UpdateStatement.ParsedInsert insertStatement;
@@ -363,10 +349,8 @@ public class CQLSSTableWriter implements Closeable
private boolean sorted = false;
private long bufferSizeInMB = 128;
- protected Builder()
- {
+ protected Builder() {
this.typeStatements = new ArrayList<>();
- this.directoryList = new ArrayList<>();
}
/**
@@ -389,7 +373,7 @@ public class CQLSSTableWriter implements Closeable
* <p>
* This is a mandatory option.
*
- * @param directory the directory to use, which should exist and be writable.
+ * @param directory the directory to use, which should exists and be writable.
* @return this builder.
*
* @throws IllegalArgumentException if {@code directory} doesn't exist or is not writable.
@@ -401,29 +385,10 @@ public class CQLSSTableWriter implements Closeable
if (!directory.canWrite())
throw new IllegalArgumentException(directory + " exists but is not writable");
- directoryList.add(directory);
+ this.directory = directory;
return this;
}
- /**
- * A pre-instanciated ColumnFamilyStore
- * <p>
- * This is can be used in place of inDirectory and forTable
- *
- * @see #inDirectory(File)
- *
- * @param cfs the list of directories to use, which should exist and be writable.
- * @return this builder.
- *
- * @throws IllegalArgumentException if a directory doesn't exist or is not writable.
- */
- public Builder withCfs(ColumnFamilyStore cfs)
- {
- this.cfs = cfs;
- return this;
- }
-
-
public Builder withType(String typeDefinition) throws SyntaxException
{
typeStatements.add(parseStatement(typeDefinition, CreateTypeStatement.class, "CREATE TYPE"));
@@ -466,20 +431,6 @@ public class CQLSSTableWriter implements Closeable
return this;
}
-
- /**
- * Specify if the sstable writer should be vnode range aware.
- * This will create a sstable per vnode range.
- *
- * @param makeRangeAware
- * @return
- */
- public Builder rangeAware(boolean makeRangeAware)
- {
- this.makeRangeAware = makeRangeAware;
- return this;
- }
-
/**
* The INSERT statement defining the order of the values to add for a given CQL row.
* <p>
@@ -548,36 +499,36 @@ public class CQLSSTableWriter implements Closeable
@SuppressWarnings("resource")
public CQLSSTableWriter build()
{
- if (directoryList.isEmpty() && cfs == null)
- throw new IllegalStateException("No output directories specified, you should provide a directory with inDirectory()");
- if (schemaStatement == null && cfs == null)
+ if (directory == null)
+ throw new IllegalStateException("No ouptut directory specified, you should provide a directory with inDirectory()");
+ if (schemaStatement == null)
throw new IllegalStateException("Missing schema, you should provide the schema for the SSTable to create with forTable()");
if (insertStatement == null)
throw new IllegalStateException("No insert statement specified, you should provide an insert statement through using()");
synchronized (CQLSSTableWriter.class)
{
- if (cfs == null)
- cfs = createOfflineTable(schemaStatement, typeStatements, directoryList);
+ String keyspace = schemaStatement.keyspace();
- if (partitioner == null)
- partitioner = cfs.getPartitioner();
+ if (Schema.instance.getKSMetaData(keyspace) == null)
+ Schema.instance.load(KeyspaceMetadata.create(keyspace, KeyspaceParams.simple(1)));
+ createTypes(keyspace);
+ CFMetaData cfMetaData = createTable(keyspace);
Pair<UpdateStatement, List<ColumnSpecification>> preparedInsert = prepareInsert();
+
AbstractSSTableSimpleWriter writer = sorted
- ? new SSTableSimpleWriter(cfs, partitioner, preparedInsert.left.updatedColumns())
- : new SSTableSimpleUnsortedWriter(cfs, partitioner, preparedInsert.left.updatedColumns(), bufferSizeInMB);
+ ? new SSTableSimpleWriter(directory, cfMetaData, preparedInsert.left.updatedColumns())
+ : new SSTableSimpleUnsortedWriter(directory, cfMetaData, preparedInsert.left.updatedColumns(), bufferSizeInMB);
if (formatType != null)
writer.setSSTableFormatType(formatType);
- writer.setRangeAwareWriting(makeRangeAware);
-
return new CQLSSTableWriter(writer, preparedInsert.left, preparedInsert.right);
}
}
- private static void createTypes(String keyspace, List<CreateTypeStatement> typeStatements)
+ private void createTypes(String keyspace)
{
KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
Types.RawBuilder builder = Types.rawBuilder(keyspace);
@@ -587,50 +538,31 @@ public class CQLSSTableWriter implements Closeable
ksm = ksm.withSwapped(builder.build());
Schema.instance.setKeyspaceMetadata(ksm);
}
-
- public static ColumnFamilyStore createOfflineTable(String schema, List<File> directoryList)
- {
- return createOfflineTable(parseStatement(schema, CreateTableStatement.RawStatement.class, "CREATE TABLE"), Collections.EMPTY_LIST, directoryList);
- }
-
/**
* Creates the table according to schema statement
- * with specified data directories
+ *
+ * @param keyspace name of the keyspace where table should be created
*/
- public static ColumnFamilyStore createOfflineTable(CreateTableStatement.RawStatement schemaStatement, List<CreateTypeStatement> typeStatements, List<File> directoryList)
+ private CFMetaData createTable(String keyspace)
{
- String keyspace = schemaStatement.keyspace();
-
- if (Schema.instance.getKSMetaData(keyspace) == null)
- Schema.instance.load(KeyspaceMetadata.create(keyspace, KeyspaceParams.simple(1)));
-
- createTypes(keyspace, typeStatements);
-
KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
CFMetaData cfMetaData = ksm.tables.getNullable(schemaStatement.columnFamily());
- assert cfMetaData == null;
-
- CreateTableStatement statement = (CreateTableStatement) schemaStatement.prepare(ksm.types).statement;
- statement.validate(ClientState.forInternalCalls());
-
- //Build metatdata with a portable cfId
- cfMetaData = statement.metadataBuilder()
- .withId(CFMetaData.generateLegacyCfId(keyspace, statement.columnFamily()))
- .build()
- .params(statement.params());
-
- Keyspace.setInitialized();
- Directories directories = new Directories(cfMetaData, directoryList.stream().map(Directories.DataDirectory::new).collect(Collectors.toList()));
+ if (cfMetaData == null)
+ {
+ CreateTableStatement statement = (CreateTableStatement) schemaStatement.prepare(ksm.types).statement;
+ statement.validate(ClientState.forInternalCalls());
- Keyspace ks = Keyspace.openWithoutSSTables(keyspace);
- ColumnFamilyStore cfs = ColumnFamilyStore.createColumnFamilyStore(ks, cfMetaData.cfName, cfMetaData, directories, false, false, true);
+ cfMetaData = statement.getCFMetaData();
- ks.initCfCustom(cfs);
- Schema.instance.load(cfs.metadata);
- Schema.instance.setKeyspaceMetadata(ksm.withSwapped(ksm.tables.with(cfs.metadata)));
+ Schema.instance.load(cfMetaData);
+ Schema.instance.setKeyspaceMetadata(ksm.withSwapped(ksm.tables.with(cfMetaData)));
+ }
- return cfs;
+ if (partitioner != null)
+ return cfMetaData.copy(partitioner);
+ else
+ return cfMetaData;
}
/**
@@ -655,7 +587,7 @@ public class CQLSSTableWriter implements Closeable
}
}
- public static <T extends ParsedStatement> T parseStatement(String query, Class<T> klass, String type)
+ private static <T extends ParsedStatement> T parseStatement(String query, Class<T> klass, String type)
{
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 15dd925..043f6fa 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -56,25 +56,13 @@ public class SSTableLoader implements StreamEventHandler
public SSTableLoader(File directory, Client client, OutputHandler outputHandler)
{
- this(directory, client, outputHandler, directory.getParentFile().getName(), 1);
+ this(directory, client, outputHandler, 1);
}
-
public SSTableLoader(File directory, Client client, OutputHandler outputHandler, int connectionsPerHost)
{
- this(directory, client, outputHandler, directory.getParentFile().getName(), connectionsPerHost);
- }
-
- public SSTableLoader(File directory, Client client, OutputHandler outputHandler, String keyspace)
- {
- this(directory, client, outputHandler, keyspace, 1);
- }
-
-
- public SSTableLoader(File directory, Client client, OutputHandler outputHandler, String keyspace, int connectionsPerHost)
- {
this.directory = directory;
- this.keyspace = keyspace;
+ this.keyspace = directory.getParentFile().getName();
this.client = client;
this.outputHandler = outputHandler;
this.connectionsPerHost = connectionsPerHost;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index 2563f26..fa88817 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.io.sstable;
import java.io.File;
import java.io.IOException;
-import java.util.Collection;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
@@ -35,7 +34,6 @@ import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.rows.UnfilteredSerializer;
import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.utils.JVMStabilityInspector;
/**
@@ -62,11 +60,11 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
private final BlockingQueue<Buffer> writeQueue = new SynchronousQueue<Buffer>();
private final DiskWriter diskWriter = new DiskWriter();
- SSTableSimpleUnsortedWriter(ColumnFamilyStore cfs, IPartitioner partitioner, PartitionColumns columns, long bufferSizeInMB)
+ SSTableSimpleUnsortedWriter(File directory, CFMetaData metadata, PartitionColumns columns, long bufferSizeInMB)
{
- super(cfs, partitioner, columns);
+ super(directory, metadata, columns);
this.bufferSize = bufferSizeInMB * 1024L * 1024L;
- this.header = new SerializationHeader(true, cfs.metadata, columns, EncodingStats.NO_STATS);
+ this.header = new SerializationHeader(true, metadata, columns, EncodingStats.NO_STATS);
diskWriter.start();
}
@@ -112,7 +110,7 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
private PartitionUpdate createPartitionUpdate(DecoratedKey key)
{
- return new PartitionUpdate(cfs.metadata, key, columns, 4)
+ return new PartitionUpdate(metadata, key, columns, 4)
{
@Override
public void add(Row row)
@@ -206,7 +204,7 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
if (b == SENTINEL)
return;
- try (SSTableTxnWriter writer = createWriter())
+ try (SSTableTxnWriter writer = createWriter())
{
for (Map.Entry<DecoratedKey, PartitionUpdate> entry : b.entrySet())
writer.append(entry.getValue().unfilteredIterator());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
index 2f6dd33..7fbd79d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
@@ -19,14 +19,12 @@ package org.apache.cassandra.io.sstable;
import java.io.File;
import java.io.IOException;
-import java.util.Collection;
import com.google.common.base.Throwables;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.dht.IPartitioner;
/**
* A SSTable writer that assumes rows are in (partitioner) sorted order.
@@ -45,9 +43,9 @@ class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
private SSTableTxnWriter writer;
- protected SSTableSimpleWriter(ColumnFamilyStore cfs, IPartitioner partitioner, PartitionColumns columns)
+ protected SSTableSimpleWriter(File directory, CFMetaData metadata, PartitionColumns columns)
{
- super(cfs, partitioner, columns);
+ super(directory, metadata, columns);
}
private SSTableTxnWriter getOrCreateWriter()
@@ -69,7 +67,7 @@ class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
if (update != null)
writePartition(update);
currentKey = key;
- update = new PartitionUpdate(cfs.metadata, currentKey, columns, 4);
+ update = new PartitionUpdate(metadata, currentKey, columns, 4);
}
assert update != null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
index 28ca4c4..015c5bb 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -106,8 +107,15 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem
@SuppressWarnings("resource") // log and writer closed during doPostCleanup
- public static SSTableTxnWriter createRangeAware(ColumnFamilyStore cfs, long keyCount, long repairedAt, SSTableFormat.Type type, int sstableLevel, SerializationHeader header)
+ public static SSTableTxnWriter createRangeAware(CFMetaData cfm,
+ long keyCount,
+ long repairedAt,
+ SSTableFormat.Type type,
+ int sstableLevel,
+ SerializationHeader header)
{
+
+ ColumnFamilyStore cfs = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfName);
LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE);
SSTableMultiWriter writer;
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
index 51b049d..1340224 100644
--- a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
+++ b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
@@ -30,7 +30,6 @@ import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.QueryProcessor;
@@ -92,7 +91,7 @@ public class LongStreamingTest
writer.close();
System.err.println(String.format("Writer finished after %d seconds....", TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start)));
- File[] dataFiles = writer.getInnermostDirectory().listFiles((dir, name) -> name.endsWith("-Data.db"));
+ File[] dataFiles = dataDir.listFiles((dir, name) -> name.endsWith("-Data.db"));
long dataSize = 0l;
for (File file : dataFiles)
{
@@ -100,7 +99,7 @@ public class LongStreamingTest
dataSize += file.length();
}
- SSTableLoader loader = new SSTableLoader(writer.getInnermostDirectory(), new SSTableLoader.Client()
+ SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client()
{
private String ks;
public void init(String keyspace)
@@ -127,7 +126,7 @@ public class LongStreamingTest
//Stream again
- loader = new SSTableLoader(writer.getInnermostDirectory(), new SSTableLoader.Client()
+ loader = new SSTableLoader(dataDir, new SSTableLoader.Client()
{
private String ks;
public void init(String keyspace)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
index 515ce18..595610e 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
@@ -131,10 +131,12 @@ public class RealTransactionsTest extends SchemaLoader
{
cfs.truncateBlocking();
+ String schema = "CREATE TABLE \"%s\".\"%s\" (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))";
String query = "INSERT INTO \"%s\".\"%s\" (key, name, val) VALUES (?, ?, ?)";
try (CQLSSTableWriter writer = CQLSSTableWriter.builder()
- .withCfs(cfs)
+ .inDirectory(cfs.getDirectories().getDirectoryForNewSSTables())
+ .forTable(String.format(schema, cfs.keyspace.getName(), cfs.name))
.using(String.format(query, cfs.keyspace.getName(), cfs.name))
.build())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
index 9502dfa..8025861 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
@@ -22,17 +22,15 @@ import java.io.FilenameFilter;
import java.io.IOException;
import com.google.common.io.Files;
-import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang.ArrayUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
-import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.tools.Util;
import static org.junit.Assert.assertEquals;
@@ -77,10 +75,16 @@ public class CQLSSTableWriterClientTest
writer.close();
writer2.close();
- FilenameFilter filter = (dir, name) -> name.endsWith("-Data.db");
+ FilenameFilter filter = new FilenameFilter()
+ {
+ @Override
+ public boolean accept(File dir, String name)
+ {
+ return name.endsWith("-Data.db");
+ }
+ };
- File[] dataFiles = (File[])ArrayUtils.addAll(writer2.getInnermostDirectory().listFiles(filter),
- writer.getInnermostDirectory().listFiles(filter));
+ File[] dataFiles = this.testDirectory.listFiles(filter);
assertEquals(2, dataFiles.length);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
index 56e62ee..3c80b9e 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
@@ -33,11 +33,9 @@ import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
-import org.apache.cassandra.auth.AuthConfig;
import org.apache.cassandra.config.*;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.cql3.functions.UDHelper;
-import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.exceptions.*;
@@ -97,7 +95,7 @@ public class CQLSSTableWriterTest
writer.close();
- loadSSTables(writer.getInnermostDirectory(), KS);
+ loadSSTables(dataDir, KS);
UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace.table1;");
assertEquals(4, rs.size());
@@ -187,7 +185,7 @@ public class CQLSSTableWriterTest
return name.endsWith("-Data.db");
}
};
- assert writer.getInnermostDirectory().list(filterDataFiles).length > 1 : Arrays.toString(writer.getInnermostDirectory().list(filterDataFiles));
+ assert dataDir.list(filterDataFiles).length > 1 : Arrays.toString(dataDir.list(filterDataFiles));
}
@@ -221,22 +219,28 @@ public class CQLSSTableWriterTest
private static final int NUMBER_WRITES_IN_RUNNABLE = 10;
private class WriterThread extends Thread
{
+ private final File dataDir;
private final int id;
- private final ColumnFamilyStore cfs;
public volatile Exception exception;
- public WriterThread(ColumnFamilyStore cfs, int id)
+ public WriterThread(File dataDir, int id)
{
- this.cfs = cfs;
+ this.dataDir = dataDir;
this.id = id;
}
@Override
public void run()
{
+ String schema = "CREATE TABLE cql_keyspace2.table2 ("
+ + " k int,"
+ + " v int,"
+ + " PRIMARY KEY (k, v)"
+ + ")";
String insert = "INSERT INTO cql_keyspace2.table2 (k, v) VALUES (?, ?)";
CQLSSTableWriter writer = CQLSSTableWriter.builder()
- .withCfs(cfs)
+ .inDirectory(dataDir)
+ .forTable(schema)
.using(insert).build();
try
@@ -264,17 +268,10 @@ public class CQLSSTableWriterTest
File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
assert dataDir.mkdirs();
- String schema = "CREATE TABLE cql_keyspace2.table2 ("
- + " k int,"
- + " v int,"
- + " PRIMARY KEY (k, v)"
- + ")";
- ColumnFamilyStore cfs = CQLSSTableWriter.Builder.createOfflineTable(schema, Collections.singletonList(dataDir));
-
WriterThread[] threads = new WriterThread[5];
for (int i = 0; i < threads.length; i++)
{
- WriterThread thread = new WriterThread(cfs, i);
+ WriterThread thread = new WriterThread(dataDir, i);
threads[i] = thread;
thread.start();
}
@@ -289,7 +286,7 @@ public class CQLSSTableWriterTest
}
}
- loadSSTables(cfs.getDirectories().getDirectoryForNewSSTables(), KS);
+ loadSSTables(dataDir, KS);
UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace2.table2;");
assertEquals(threads.length * NUMBER_WRITES_IN_RUNNABLE, rs.size());
@@ -341,7 +338,7 @@ public class CQLSSTableWriterTest
}
writer.close();
- loadSSTables(writer.getInnermostDirectory(), KS);
+ loadSSTables(dataDir, KS);
UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS + "." + TABLE);
TypeCodec collectionCodec = UDHelper.codecFor(DataType.CollectionType.frozenList(tuple2Type));
@@ -412,7 +409,7 @@ public class CQLSSTableWriterTest
}
writer.close();
- loadSSTables(writer.getInnermostDirectory(), KS);
+ loadSSTables(dataDir, KS);
UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS + "." + TABLE);
@@ -503,7 +500,7 @@ public class CQLSSTableWriterTest
writer.addRow(5, 5, 5, "5");
writer.close();
- loadSSTables(writer.getInnermostDirectory(), KS);
+ loadSSTables(dataDir, KS);
UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS + "." + TABLE);
Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
index ba7571f..72c7467 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.io.sstable;
import java.io.File;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@@ -34,8 +33,6 @@ import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.marshal.AsciiType;
@@ -125,8 +122,6 @@ public class SSTableLoaderTest
String schema = "CREATE TABLE %s.%s (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))";
String query = "INSERT INTO %s.%s (key, name, val) VALUES (?, ?, ?)";
-
- File outputDir;
try (CQLSSTableWriter writer = CQLSSTableWriter.builder()
.inDirectory(dataDir)
.forTable(String.format(schema, KEYSPACE1, CF_STANDARD1))
@@ -134,22 +129,22 @@ public class SSTableLoaderTest
.build())
{
writer.addRow("key1", "col1", "100");
- outputDir = writer.getInnermostDirectory();
}
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
+ cfs.forceBlockingFlush(); // wait for sstables to be on disk else we won't be able to stream them
+
final CountDownLatch latch = new CountDownLatch(1);
- SSTableLoader loader = new SSTableLoader(outputDir, new TestClient(), new OutputHandler.SystemOutput(false, false));
+ SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false));
loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
- UntypedResultSet rs = QueryProcessor.executeInternal(String.format("SELECT * FROM %s.%s;", KEYSPACE1, CF_STANDARD1));
-
- assertEquals(1, rs.size());
-
- Iterator<UntypedResultSet.Row> iter = rs.iterator();
- UntypedResultSet.Row row;
+ List<FilteredPartition> partitions = Util.getAll(Util.cmd(cfs).build());
- row = iter.next();
- assertEquals("key1", row.getString("key"));
+ assertEquals(1, partitions.size());
+ assertEquals("key1", AsciiType.instance.getString(partitions.get(0).partitionKey().getKey()));
+ assertEquals(ByteBufferUtil.bytes("100"), partitions.get(0).getRow(Clustering.make(ByteBufferUtil.bytes("col1")))
+ .getCell(cfmeta.getColumnDefinition(ByteBufferUtil.bytes("val")))
+ .value());
// The stream future is signalled when the work is complete but before releasing references. Wait for release
// before cleanup (CASSANDRA-10118).
@@ -165,9 +160,8 @@ public class SSTableLoaderTest
//make sure we have no tables...
assertTrue(dataDir.listFiles().length == 0);
- //Since this is running in the same jvm we need to put it in a tmp keyspace
- String schema = "CREATE TABLE \"%stmp\".\"%s\" (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name)) with compression = {}";
- String query = "INSERT INTO \"%stmp\".\"%s\" (key, name, val) VALUES (?, ?, ?)";
+ String schema = "CREATE TABLE %s.%s (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))";
+ String query = "INSERT INTO %s.%s (key, name, val) VALUES (?, ?, ?)";
CQLSSTableWriter writer = CQLSSTableWriter.builder()
.inDirectory(dataDir)
@@ -176,7 +170,7 @@ public class SSTableLoaderTest
.withBufferSizeInMB(1)
.build();
- int NB_PARTITIONS = 4200; // Enough to write >1MB and get at least one completed sstable before we've closed the writer
+ int NB_PARTITIONS = 5000; // Enough to write >1MB and get at least one completed sstable before we've closed the writer
for (int i = 0; i < NB_PARTITIONS; i++)
{
@@ -188,11 +182,11 @@ public class SSTableLoaderTest
cfs.forceBlockingFlush(); // wait for sstables to be on disk else we won't be able to stream them
//make sure we have some tables...
- assertTrue(writer.getInnermostDirectory().listFiles().length > 0);
+ assertTrue(dataDir.listFiles().length > 0);
final CountDownLatch latch = new CountDownLatch(2);
//writer is still open so loader should not load anything
- SSTableLoader loader = new SSTableLoader(writer.getInnermostDirectory(), new TestClient(), new OutputHandler.SystemOutput(false, false), KEYSPACE1);
+ SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false));
loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
List<FilteredPartition> partitions = Util.getAll(Util.cmd(cfs).build());
@@ -202,7 +196,7 @@ public class SSTableLoaderTest
// now we complete the write and the second loader should load the last sstable as well
writer.close();
- loader = new SSTableLoader(writer.getInnermostDirectory(), new TestClient(), new OutputHandler.SystemOutput(false, false), KEYSPACE1);
+ loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false));
loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2)).build());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java b/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java
new file mode 100644
index 0000000..4fe05a8
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java
@@ -0,0 +1,672 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.TypeCodec;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UpdateParameters;
+import org.apache.cassandra.cql3.functions.UDHelper;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.cql3.statements.CreateTypeStatement;
+import org.apache.cassandra.cql3.statements.ParsedStatement;
+import org.apache.cassandra.cql3.statements.UpdateStatement;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Types;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * Utility to write SSTables.
+ * <p>
+ * Typical usage looks like:
+ * <pre>
+ * String type = CREATE TYPE myKs.myType (a int, b int)";
+ * String schema = "CREATE TABLE myKs.myTable ("
+ * + " k int PRIMARY KEY,"
+ * + " v1 text,"
+ * + " v2 int,"
+ * + " v3 myType,"
+ * + ")";
+ * String insert = "INSERT INTO myKs.myTable (k, v1, v2, v3) VALUES (?, ?, ?, ?)";
+ *
+ * // Creates a new writer. You need to provide at least the directory where to write the created sstable,
+ * // the schema for the sstable to write and a (prepared) insert statement to use. If you do not use the
+ * // default partitioner (Murmur3Partitioner), you will also need to provide the partitioner in use, see
+ * // StressCQLSSTableWriter.Builder for more details on the available options.
+ * StressCQLSSTableWriter writer = StressCQLSSTableWriter.builder()
+ * .inDirectory("path/to/directory")
+ * .withType(type)
+ * .forTable(schema)
+ * .using(insert).build();
+ *
+ * UserType myType = writer.getUDType("myType");
+ * // Adds a nember of rows to the resulting sstable
+ * writer.addRow(0, "test1", 24, myType.newValue().setInt("a", 10).setInt("b", 20));
+ * writer.addRow(1, "test2", null, null);
+ * writer.addRow(2, "test3", 42, myType.newValue().setInt("a", 30).setInt("b", 40));
+ *
+ * // Close the writer, finalizing the sstable
+ * writer.close();
+ * </pre>
+ *
+ * Please note that {@code StressCQLSSTableWriter} is <b>not</b> thread-safe (multiple threads cannot access the
+ * same instance). It is however safe to use multiple instances in parallel (even if those instance write
+ * sstables for the same table).
+ */
+public class StressCQLSSTableWriter implements Closeable
+{
+ public static final ByteBuffer UNSET_VALUE = ByteBufferUtil.UNSET_BYTE_BUFFER;
+
+ static
+ {
+ DatabaseDescriptor.clientInitialization(false);
+ // Partitioner is not set in client mode.
+ if (DatabaseDescriptor.getPartitioner() == null)
+ DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+ }
+
+ private final AbstractSSTableSimpleWriter writer;
+ private final UpdateStatement insert;
+ private final List<ColumnSpecification> boundNames;
+ private final List<TypeCodec> typeCodecs;
+ private final ColumnFamilyStore cfs;
+
+ private StressCQLSSTableWriter(ColumnFamilyStore cfs, AbstractSSTableSimpleWriter writer, UpdateStatement insert, List<ColumnSpecification> boundNames)
+ {
+ this.cfs = cfs;
+ this.writer = writer;
+ this.insert = insert;
+ this.boundNames = boundNames;
+ this.typeCodecs = boundNames.stream().map(bn -> UDHelper.codecFor(UDHelper.driverType(bn.type)))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Returns a new builder for a StressCQLSSTableWriter.
+ *
+ * @return the new builder.
+ */
+ public static Builder builder()
+ {
+ return new Builder();
+ }
+
+ /**
+ * Adds a new row to the writer.
+ * <p>
+ * This is a shortcut for {@code addRow(Arrays.asList(values))}.
+ *
+ * @param values the row values (corresponding to the bind variables of the
+ * insertion statement used when creating by this writer).
+ * @return this writer.
+ */
+ public StressCQLSSTableWriter addRow(Object... values)
+ throws InvalidRequestException, IOException
+ {
+ return addRow(Arrays.asList(values));
+ }
+
+ /**
+ * Adds a new row to the writer.
+ * <p>
+ * Each provided value type should correspond to the types of the CQL column
+ * the value is for. The correspondance between java type and CQL type is the
+ * same one than the one documented at
+ * www.datastax.com/drivers/java/2.0/apidocs/com/datastax/driver/core/DataType.Name.html#asJavaClass().
+ * <p>
+ * If you prefer providing the values directly as binary, use
+ * {@link #rawAddRow} instead.
+ *
+ * @param values the row values (corresponding to the bind variables of the
+ * insertion statement used when creating by this writer).
+ * @return this writer.
+ */
+ public StressCQLSSTableWriter addRow(List<Object> values)
+ throws InvalidRequestException, IOException
+ {
+ int size = Math.min(values.size(), boundNames.size());
+ List<ByteBuffer> rawValues = new ArrayList<>(size);
+
+ for (int i = 0; i < size; i++)
+ {
+ Object value = values.get(i);
+ rawValues.add(serialize(value, typeCodecs.get(i)));
+ }
+
+ return rawAddRow(rawValues);
+ }
+
+ /**
+ * Adds a new row to the writer.
+ * <p>
+ * This is equivalent to the other addRow methods, but takes a map whose
+ * keys are the names of the columns to add instead of taking a list of the
+ * values in the order of the insert statement used during construction of
+ * this write.
+ * <p>
+ * Please note that the column names in the map keys must be in lowercase unless
+ * the declared column name is a
+ * <a href="http://cassandra.apache.org/doc/cql3/CQL.html#identifiers">case-sensitive quoted identifier</a>
+ * (in which case the map key must use the exact case of the column).
+ *
+ * @param values a map of colum name to column values representing the new
+ * row to add. Note that if a column is not part of the map, it's value will
+ * be {@code null}. If the map contains keys that does not correspond to one
+ * of the column of the insert statement used when creating this writer, the
+ * the corresponding value is ignored.
+ * @return this writer.
+ */
+ public StressCQLSSTableWriter addRow(Map<String, Object> values)
+ throws InvalidRequestException, IOException
+ {
+ int size = boundNames.size();
+ List<ByteBuffer> rawValues = new ArrayList<>(size);
+ for (int i = 0; i < size; i++)
+ {
+ ColumnSpecification spec = boundNames.get(i);
+ Object value = values.get(spec.name.toString());
+ rawValues.add(serialize(value, typeCodecs.get(i)));
+ }
+ return rawAddRow(rawValues);
+ }
+
+ /**
+ * Adds a new row to the writer given already serialized values.
+ *
+ * @param values the row values (corresponding to the bind variables of the
+ * insertion statement used when creating by this writer) as binary.
+ * @return this writer.
+ */
+ public StressCQLSSTableWriter rawAddRow(ByteBuffer... values)
+ throws InvalidRequestException, IOException
+ {
+ return rawAddRow(Arrays.asList(values));
+ }
+
+ /**
+ * Adds a new row to the writer given already serialized values.
+ * <p>
+ * This is a shortcut for {@code rawAddRow(Arrays.asList(values))}.
+ *
+ * @param values the row values (corresponding to the bind variables of the
+ * insertion statement used when creating by this writer) as binary.
+ * @return this writer.
+ */
+ public StressCQLSSTableWriter rawAddRow(List<ByteBuffer> values)
+ throws InvalidRequestException, IOException
+ {
+ if (values.size() != boundNames.size())
+ throw new InvalidRequestException(String.format("Invalid number of arguments, expecting %d values but got %d", boundNames.size(), values.size()));
+
+ QueryOptions options = QueryOptions.forInternalCalls(null, values);
+ List<ByteBuffer> keys = insert.buildPartitionKeyNames(options);
+ SortedSet<Clustering> clusterings = insert.createClustering(options);
+
+ long now = System.currentTimeMillis() * 1000;
+ // Note that we asks indexes to not validate values (the last 'false' arg below) because that triggers a 'Keyspace.open'
+ // and that forces a lot of initialization that we don't want.
+ UpdateParameters params = new UpdateParameters(insert.cfm,
+ insert.updatedColumns(),
+ options,
+ insert.getTimestamp(now, options),
+ insert.getTimeToLive(options),
+ Collections.<DecoratedKey, Partition>emptyMap());
+
+ try
+ {
+ for (ByteBuffer key : keys)
+ {
+ for (Clustering clustering : clusterings)
+ insert.addUpdateForKey(writer.getUpdateFor(key), clustering, params);
+ }
+ return this;
+ }
+ catch (SSTableSimpleUnsortedWriter.SyncException e)
+ {
+ // If we use a BufferedWriter and had a problem writing to disk, the IOException has been
+ // wrapped in a SyncException (see BufferedWriter below). We want to extract that IOE.
+ throw (IOException)e.getCause();
+ }
+ }
+
+ /**
+ * Adds a new row to the writer given already serialized values.
+ * <p>
+ * This is equivalent to the other rawAddRow methods, but takes a map whose
+ * keys are the names of the columns to add instead of taking a list of the
+ * values in the order of the insert statement used during construction of
+ * this write.
+ *
+ * @param values a map of colum name to column values representing the new
+ * row to add. Note that if a column is not part of the map, it's value will
+ * be {@code null}. If the map contains keys that does not correspond to one
+ * of the column of the insert statement used when creating this writer, the
+ * the corresponding value is ignored.
+ * @return this writer.
+ */
+ public StressCQLSSTableWriter rawAddRow(Map<String, ByteBuffer> values)
+ throws InvalidRequestException, IOException
+ {
+ int size = Math.min(values.size(), boundNames.size());
+ List<ByteBuffer> rawValues = new ArrayList<>(size);
+ for (int i = 0; i < size; i++)
+ {
+ ColumnSpecification spec = boundNames.get(i);
+ rawValues.add(values.get(spec.name.toString()));
+ }
+ return rawAddRow(rawValues);
+ }
+
+ /**
+ * Returns the User Defined type, used in this SSTable Writer, that can
+ * be used to create UDTValue instances.
+ *
+ * @param dataType name of the User Defined type
+ * @return user defined type
+ */
+ public com.datastax.driver.core.UserType getUDType(String dataType)
+ {
+ KeyspaceMetadata ksm = Schema.instance.getKSMetaData(insert.keyspace());
+ UserType userType = ksm.types.getNullable(ByteBufferUtil.bytes(dataType));
+ return (com.datastax.driver.core.UserType) UDHelper.driverType(userType);
+ }
+
+ /**
+ * Close this writer.
+ * <p>
+ * This method should be called, otherwise the produced sstables are not
+ * guaranteed to be complete (and won't be in practice).
+ */
+ public void close() throws IOException
+ {
+ writer.close();
+ }
+
+ private ByteBuffer serialize(Object value, TypeCodec codec)
+ {
+ if (value == null || value == UNSET_VALUE)
+ return (ByteBuffer) value;
+
+ return codec.serialize(value, ProtocolVersion.NEWEST_SUPPORTED);
+ }
+ /**
+ * The writer loads data in directories corresponding to how they laid out on the server.
+ * <p>
+ * {keyspace}/{table-cfid}/
+ *
+ * This method can be used to fetch the innermost directory with the sstable components
+ * @return The directory containing the sstable components
+ */
+ public File getInnermostDirectory()
+ {
+ return cfs.getDirectories().getDirectoryForNewSSTables();
+ }
+
+ /**
+ * A Builder for a StressCQLSSTableWriter object.
+ */
+ public static class Builder
+ {
+ private final List<File> directoryList;
+ private ColumnFamilyStore cfs;
+
+ protected SSTableFormat.Type formatType = null;
+
+ private Boolean makeRangeAware = false;
+
+ private CreateTableStatement.RawStatement schemaStatement;
+ private final List<CreateTypeStatement> typeStatements;
+ private UpdateStatement.ParsedInsert insertStatement;
+ private IPartitioner partitioner;
+
+ private boolean sorted = false;
+ private long bufferSizeInMB = 128;
+
+ protected Builder()
+ {
+ this.typeStatements = new ArrayList<>();
+ this.directoryList = new ArrayList<>();
+ }
+
+ /**
+ * The directory where to write the sstables.
+ * <p>
+ * This is a mandatory option.
+ *
+ * @param directory the directory to use, which should exists and be writable.
+ * @return this builder.
+ *
+ * @throws IllegalArgumentException if {@code directory} doesn't exist or is not writable.
+ */
+ public Builder inDirectory(String directory)
+ {
+ return inDirectory(new File(directory));
+ }
+
+ /**
+ * The directory where to write the sstables (mandatory option).
+ * <p>
+ * This is a mandatory option.
+ *
+ * @param directory the directory to use, which should exist and be writable.
+ * @return this builder.
+ *
+ * @throws IllegalArgumentException if {@code directory} doesn't exist or is not writable.
+ */
+ public Builder inDirectory(File directory)
+ {
+ if (!directory.exists())
+ throw new IllegalArgumentException(directory + " doesn't exists");
+ if (!directory.canWrite())
+ throw new IllegalArgumentException(directory + " exists but is not writable");
+
+ directoryList.add(directory);
+ return this;
+ }
+
+ /**
+ * A pre-instanciated ColumnFamilyStore
+ * <p>
+ * This is can be used in place of inDirectory and forTable
+ *
+ * @see #inDirectory(File)
+ *
+ * @param cfs the list of directories to use, which should exist and be writable.
+ * @return this builder.
+ *
+ * @throws IllegalArgumentException if a directory doesn't exist or is not writable.
+ */
+ public Builder withCfs(ColumnFamilyStore cfs)
+ {
+ this.cfs = cfs;
+ return this;
+ }
+
+
+ public Builder withType(String typeDefinition) throws SyntaxException
+ {
+ typeStatements.add(parseStatement(typeDefinition, CreateTypeStatement.class, "CREATE TYPE"));
+ return this;
+ }
+
+ /**
+ * The schema (CREATE TABLE statement) for the table for which sstable are to be created.
+ * <p>
+ * Please note that the provided CREATE TABLE statement <b>must</b> use a fully-qualified
+ * table name, one that include the keyspace name.
+ * <p>
+ * This is a mandatory option.
+ *
+ * @param schema the schema of the table for which sstables are to be created.
+ * @return this builder.
+ *
+ * @throws IllegalArgumentException if {@code schema} is not a valid CREATE TABLE statement
+ * or does not have a fully-qualified table name.
+ */
+ public Builder forTable(String schema)
+ {
+ this.schemaStatement = parseStatement(schema, CreateTableStatement.RawStatement.class, "CREATE TABLE");
+ return this;
+ }
+
+ /**
+ * The partitioner to use.
+ * <p>
+ * By default, {@code Murmur3Partitioner} will be used. If this is not the partitioner used
+ * by the cluster for which the SSTables are created, you need to use this method to
+ * provide the correct partitioner.
+ *
+ * @param partitioner the partitioner to use.
+ * @return this builder.
+ */
+ public Builder withPartitioner(IPartitioner partitioner)
+ {
+ this.partitioner = partitioner;
+ return this;
+ }
+
+
+ /**
+ * Specify if the sstable writer should be vnode range aware.
+ * This will create a sstable per vnode range.
+ *
+ * @param makeRangeAware
+ * @return
+ */
+ public Builder rangeAware(boolean makeRangeAware)
+ {
+ this.makeRangeAware = makeRangeAware;
+ return this;
+ }
+
+ /**
+ * The INSERT statement defining the order of the values to add for a given CQL row.
+ * <p>
+ * Please note that the provided INSERT statement <b>must</b> use a fully-qualified
+ * table name, one that include the keyspace name. Morewover, said statement must use
+ * bind variables since it is those bind variables that will be bound to values by the
+ * resulting writer.
+ * <p>
+ * This is a mandatory option, and this needs to be called after foTable().
+ *
+ * @param insert an insertion statement that defines the order
+ * of column values to use.
+ * @return this builder.
+ *
+ * @throws IllegalArgumentException if {@code insertStatement} is not a valid insertion
+ * statement, does not have a fully-qualified table name or have no bind variables.
+ */
+ public Builder using(String insert)
+ {
+ this.insertStatement = parseStatement(insert, UpdateStatement.ParsedInsert.class, "INSERT");
+ return this;
+ }
+
+ /**
+ * The size of the buffer to use.
+ * <p>
+ * This defines how much data will be buffered before being written as
+ * a new SSTable. This correspond roughly to the data size that will have the created
+ * sstable.
+ * <p>
+ * The default is 128MB, which should be reasonable for a 1GB heap. If you experience
+ * OOM while using the writer, you should lower this value.
+ *
+ * @param size the size to use in MB.
+ * @return this builder.
+ */
+ public Builder withBufferSizeInMB(int size)
+ {
+ this.bufferSizeInMB = size;
+ return this;
+ }
+
+ /**
+ * Creates a StressCQLSSTableWriter that expects sorted inputs.
+ * <p>
+ * If this option is used, the resulting writer will expect rows to be
+ * added in SSTable sorted order (and an exception will be thrown if that
+ * is not the case during insertion). The SSTable sorted order means that
+ * rows are added such that their partition key respect the partitioner
+ * order.
+ * <p>
+ * You should thus only use this option is you know that you can provide
+ * the rows in order, which is rarely the case. If you can provide the
+ * rows in order however, using this sorted might be more efficient.
+ * <p>
+ * Note that if used, some option like withBufferSizeInMB will be ignored.
+ *
+ * @return this builder.
+ */
+ public Builder sorted()
+ {
+ this.sorted = true;
+ return this;
+ }
+
+ @SuppressWarnings("resource")
+ public StressCQLSSTableWriter build()
+ {
+ if (directoryList.isEmpty() && cfs == null)
+ throw new IllegalStateException("No output directories specified, you should provide a directory with inDirectory()");
+ if (schemaStatement == null && cfs == null)
+ throw new IllegalStateException("Missing schema, you should provide the schema for the SSTable to create with forTable()");
+ if (insertStatement == null)
+ throw new IllegalStateException("No insert statement specified, you should provide an insert statement through using()");
+
+ synchronized (StressCQLSSTableWriter.class)
+ {
+ if (cfs == null)
+ cfs = createOfflineTable(schemaStatement, typeStatements, directoryList);
+
+ if (partitioner == null)
+ partitioner = cfs.getPartitioner();
+
+ Pair<UpdateStatement, List<ColumnSpecification>> preparedInsert = prepareInsert();
+ AbstractSSTableSimpleWriter writer = sorted
+ ? new SSTableSimpleWriter(cfs.getDirectories().getDirectoryForNewSSTables(), cfs.metadata, preparedInsert.left.updatedColumns())
+ : new SSTableSimpleUnsortedWriter(cfs.getDirectories().getDirectoryForNewSSTables(), cfs.metadata, preparedInsert.left.updatedColumns(), bufferSizeInMB);
+
+ if (formatType != null)
+ writer.setSSTableFormatType(formatType);
+
+ writer.setRangeAwareWriting(makeRangeAware);
+
+ return new StressCQLSSTableWriter(cfs, writer, preparedInsert.left, preparedInsert.right);
+ }
+ }
+
+ private static void createTypes(String keyspace, List<CreateTypeStatement> typeStatements)
+ {
+ KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
+ Types.RawBuilder builder = Types.rawBuilder(keyspace);
+ for (CreateTypeStatement st : typeStatements)
+ st.addToRawBuilder(builder);
+
+ ksm = ksm.withSwapped(builder.build());
+ Schema.instance.setKeyspaceMetadata(ksm);
+ }
+
+ public static ColumnFamilyStore createOfflineTable(String schema, List<File> directoryList)
+ {
+ return createOfflineTable(parseStatement(schema, CreateTableStatement.RawStatement.class, "CREATE TABLE"), Collections.EMPTY_LIST, directoryList);
+ }
+
+ /**
+ * Creates the table according to schema statement
+ * with specified data directories
+ */
+ public static ColumnFamilyStore createOfflineTable(CreateTableStatement.RawStatement schemaStatement, List<CreateTypeStatement> typeStatements, List<File> directoryList)
+ {
+ String keyspace = schemaStatement.keyspace();
+
+ if (Schema.instance.getKSMetaData(keyspace) == null)
+ Schema.instance.load(KeyspaceMetadata.create(keyspace, KeyspaceParams.simple(1)));
+
+ createTypes(keyspace, typeStatements);
+
+ KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
+
+ CFMetaData cfMetaData = ksm.tables.getNullable(schemaStatement.columnFamily());
+ assert cfMetaData == null;
+
+ CreateTableStatement statement = (CreateTableStatement) schemaStatement.prepare(ksm.types).statement;
+ statement.validate(ClientState.forInternalCalls());
+
+ //Build metatdata with a portable cfId
+ cfMetaData = statement.metadataBuilder()
+ .withId(CFMetaData.generateLegacyCfId(keyspace, statement.columnFamily()))
+ .build()
+ .params(statement.params());
+
+ Keyspace.setInitialized();
+ Directories directories = new Directories(cfMetaData, directoryList.stream().map(Directories.DataDirectory::new).collect(Collectors.toList()));
+
+ Keyspace ks = Keyspace.openWithoutSSTables(keyspace);
+ ColumnFamilyStore cfs = ColumnFamilyStore.createColumnFamilyStore(ks, cfMetaData.cfName, cfMetaData, directories, false, false, true);
+
+ ks.initCfCustom(cfs);
+ Schema.instance.load(cfs.metadata);
+ Schema.instance.setKeyspaceMetadata(ksm.withSwapped(ksm.tables.with(cfs.metadata)));
+
+ return cfs;
+ }
+
+ /**
+ * Prepares insert statement for writing data to SSTable
+ *
+ * @return prepared Insert statement and it's bound names
+ */
+ private Pair<UpdateStatement, List<ColumnSpecification>> prepareInsert()
+ {
+ ParsedStatement.Prepared cqlStatement = insertStatement.prepare();
+ UpdateStatement insert = (UpdateStatement) cqlStatement.statement;
+ insert.validate(ClientState.forInternalCalls());
+
+ if (insert.hasConditions())
+ throw new IllegalArgumentException("Conditional statements are not supported");
+ if (insert.isCounter())
+ throw new IllegalArgumentException("Counter update statements are not supported");
+ if (cqlStatement.boundNames.isEmpty())
+ throw new IllegalArgumentException("Provided insert statement has no bind variables");
+
+ return Pair.create(insert, cqlStatement.boundNames);
+ }
+ }
+
+ public static <T extends ParsedStatement> T parseStatement(String query, Class<T> klass, String type)
+ {
+ try
+ {
+ ParsedStatement stmt = QueryProcessor.parseStatement(query);
+
+ if (!stmt.getClass().equals(klass))
+ throw new IllegalArgumentException("Invalid query, must be a " + type + " statement but was: " + stmt.getClass());
+
+ return klass.cast(stmt);
+ }
+ catch (RequestValidationException e)
+ {
+ throw new IllegalArgumentException(e.getMessage(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java b/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java
index 664f8d2..4180524 100644
--- a/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java
+++ b/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java
@@ -24,39 +24,32 @@ import java.net.InetAddress;
import java.net.URI;
import java.util.*;
import java.util.concurrent.*;
-import java.util.stream.Collectors;
import javax.inject.Inject;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
import io.airlift.command.*;
-import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.statements.CreateTableStatement;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.CQLSSTableWriter;
+import org.apache.cassandra.io.sstable.StressCQLSSTableWriter;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.schema.KeyspaceMetadata;
-import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.stress.generate.PartitionGenerator;
import org.apache.cassandra.stress.generate.SeedManager;
import org.apache.cassandra.stress.operations.userdefined.SchemaInsert;
import org.apache.cassandra.stress.settings.StressSettings;
-import org.apache.cassandra.tools.Util;
import org.apache.cassandra.tools.nodetool.CompactionStats;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -79,6 +72,11 @@ public abstract class CompactionStress implements Runnable
@Option(name = {"-v", "--vnodes"}, description = "number of local tokens to generate (default 256)")
Integer numTokens = 256;
+ static
+ {
+ DatabaseDescriptor.toolInitialization();
+ }
+
List<File> getDataDirectories()
{
List<File> dataDirectories = new ArrayList<>(dataDirs.size());
@@ -112,14 +110,12 @@ public abstract class CompactionStress implements Runnable
ColumnFamilyStore initCf(StressProfile stressProfile, boolean loadSSTables)
{
- Util.initDatabaseDescriptor();
-
generateTokens(stressProfile.seedStr, StorageService.instance.getTokenMetadata(), numTokens);
CreateTableStatement.RawStatement createStatement = stressProfile.getCreateStatement();
List<File> dataDirectories = getDataDirectories();
- ColumnFamilyStore cfs = CQLSSTableWriter.Builder.createOfflineTable(createStatement, Collections.EMPTY_LIST, dataDirectories);
+ ColumnFamilyStore cfs = StressCQLSSTableWriter.Builder.createOfflineTable(createStatement, Collections.EMPTY_LIST, dataDirectories);
if (loadSSTables)
{
@@ -302,7 +298,7 @@ public abstract class CompactionStress implements Runnable
{
//Every thread needs it's own writer
final SchemaInsert insert = stressProfile.getOfflineInsert(null, generator, seedManager, settings);
- final CQLSSTableWriter tableWriter = insert.createWriter(cfs, bufferSize, makeRangeAware);
+ final StressCQLSSTableWriter tableWriter = insert.createWriter(cfs, bufferSize, makeRangeAware);
executorService.submit(() -> {
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
index b45462f..4fb70c6 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
@@ -62,7 +62,7 @@ import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.Constructor;
import org.yaml.snakeyaml.error.YAMLException;
-import static org.apache.cassandra.io.sstable.CQLSSTableWriter.parseStatement;
+import static org.apache.cassandra.io.sstable.StressCQLSSTableWriter.parseStatement;
public class StressProfile implements Serializable
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0026e4ee/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
index 96b3392..2c717a1 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
@@ -21,26 +21,18 @@ package org.apache.cassandra.stress.operations.userdefined;
*/
-import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.stream.Collectors;
-import com.google.common.util.concurrent.Uninterruptibles;
-
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Statement;
-import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.io.sstable.CQLSSTableWriter;
+import org.apache.cassandra.io.sstable.StressCQLSSTableWriter;
import org.apache.cassandra.stress.WorkManager;
import org.apache.cassandra.stress.generate.*;
import org.apache.cassandra.stress.report.Timer;
@@ -142,9 +134,9 @@ public class SchemaInsert extends SchemaStatement
private class OfflineRun extends Runner
{
- final CQLSSTableWriter writer;
+ final StressCQLSSTableWriter writer;
- OfflineRun(CQLSSTableWriter writer)
+ OfflineRun(StressCQLSSTableWriter writer)
{
this.writer = writer;
}
@@ -182,9 +174,9 @@ public class SchemaInsert extends SchemaStatement
timeWithRetry(new ThriftRun(client));
}
- public CQLSSTableWriter createWriter(ColumnFamilyStore cfs, int bufferSize, boolean makeRangeAware)
+ public StressCQLSSTableWriter createWriter(ColumnFamilyStore cfs, int bufferSize, boolean makeRangeAware)
{
- return CQLSSTableWriter.builder()
+ return StressCQLSSTableWriter.builder()
.withCfs(cfs)
.withBufferSizeInMB(bufferSize)
.forTable(tableSchema)
@@ -193,7 +185,7 @@ public class SchemaInsert extends SchemaStatement
.build();
}
- public void runOffline(CQLSSTableWriter writer, WorkManager workManager) throws Exception
+ public void runOffline(StressCQLSSTableWriter writer, WorkManager workManager) throws Exception
{
OfflineRun offline = new OfflineRun(writer);