You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/12/12 15:10:23 UTC
[2/5] cassandra git commit: cassandra-stress simultaneous inserts
over same seed
cassandra-stress simultaneous inserts over same seed
patch by benedict; reviewed by rstupp CASSANDRA-7964
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6c579a01
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6c579a01
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6c579a01
Branch: refs/heads/cassandra-2.1
Commit: 6c579a0102fa3e67215fef5d9f8aa97191e3a216
Parents: cdba5aa
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Dec 12 14:09:37 2014 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Dec 12 14:09:37 2014 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/stress/Operation.java | 86 ++-
.../apache/cassandra/stress/StressAction.java | 96 +---
.../apache/cassandra/stress/StressMetrics.java | 1 -
.../apache/cassandra/stress/StressProfile.java | 65 +--
.../apache/cassandra/stress/StressServer.java | 3 +-
.../cassandra/stress/generate/Partition.java | 554 -------------------
.../stress/generate/PartitionGenerator.java | 39 +-
.../apache/cassandra/stress/generate/Seed.java | 52 +-
.../cassandra/stress/generate/SeedManager.java | 66 ++-
.../cassandra/stress/generate/values/Bytes.java | 7 +-
.../stress/generate/values/GeneratorConfig.java | 9 +-
.../stress/generate/values/Strings.java | 2 -
.../stress/generate/values/TimeUUIDs.java | 4 +-
.../stress/operations/FixedOpDistribution.java | 7 -
.../stress/operations/OpDistribution.java | 1 -
.../operations/SampledOpDistribution.java | 9 -
.../operations/predefined/CqlCounterAdder.java | 5 +-
.../operations/predefined/CqlCounterGetter.java | 5 +-
.../operations/predefined/CqlInserter.java | 6 +-
.../operations/predefined/CqlOperation.java | 32 +-
.../stress/operations/predefined/CqlReader.java | 7 +-
.../predefined/PredefinedOperation.java | 40 +-
.../predefined/ThriftCounterAdder.java | 9 +-
.../predefined/ThriftCounterGetter.java | 5 +-
.../operations/predefined/ThriftInserter.java | 11 +-
.../operations/predefined/ThriftReader.java | 7 +-
.../operations/userdefined/SchemaInsert.java | 52 +-
.../operations/userdefined/SchemaQuery.java | 55 +-
.../operations/userdefined/SchemaStatement.java | 16 +-
.../cassandra/stress/settings/Command.java | 5 +-
.../stress/settings/OptionAnyProbabilities.java | 8 +-
.../stress/settings/OptionDistribution.java | 4 +-
.../settings/OptionEnumProbabilities.java | 2 +-
.../cassandra/stress/settings/OptionMulti.java | 7 +-
.../settings/OptionRatioDistribution.java | 15 +-
.../stress/settings/SettingsColumn.java | 12 +-
.../stress/settings/SettingsCommand.java | 1 -
.../settings/SettingsCommandPreDefined.java | 7 +-
.../SettingsCommandPreDefinedMixed.java | 6 +-
.../stress/settings/SettingsCommandUser.java | 14 +-
.../stress/settings/SettingsErrors.java | 3 -
.../cassandra/stress/settings/SettingsNode.java | 7 +-
.../stress/settings/SettingsSchema.java | 1 -
.../stress/settings/StressSettings.java | 3 -
.../cassandra/stress/util/DynamicList.java | 15 +-
.../cassandra/stress/util/JavaDriverClient.java | 4 +-
.../stress/util/SmartThriftClient.java | 3 +-
.../org/apache/cassandra/stress/util/Timer.java | 1 -
.../apache/cassandra/stress/util/Timing.java | 1 -
.../cassandra/stress/util/TimingInterval.java | 1 -
51 files changed, 323 insertions(+), 1049 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fa42e85..25140a8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.3
+ * cassandra-stress simultaneous inserts over same seed (CASSANDRA-7964)
* Reduce cassandra-stress sampling memory requirements (CASSANDRA-7926)
* Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383)
* Make read "defrag" async to reclaim memtables (CASSANDRA-8459)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/Operation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Operation.java b/tools/stress/src/org/apache/cassandra/stress/Operation.java
index 5560240..edf3a54 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Operation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Operation.java
@@ -19,13 +19,13 @@ package org.apache.cassandra.stress;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
-import org.apache.cassandra.stress.generate.Distribution;
-import org.apache.cassandra.stress.generate.Partition;
-import org.apache.cassandra.stress.generate.PartitionGenerator;
-import org.apache.cassandra.stress.settings.*;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.cassandra.stress.generate.*;
+import org.apache.cassandra.stress.settings.SettingsLog;
+import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.JavaDriverClient;
import org.apache.cassandra.stress.util.ThriftClient;
import org.apache.cassandra.stress.util.Timer;
@@ -36,17 +36,42 @@ public abstract class Operation
{
public final StressSettings settings;
public final Timer timer;
- public final PartitionGenerator generator;
- public final Distribution partitionCount;
+ protected final DataSpec spec;
+
+ private final List<PartitionIterator> partitionCache = new ArrayList<>();
+ protected List<PartitionIterator> partitions;
- protected List<Partition> partitions;
+ public static final class DataSpec
+ {
+ public final PartitionGenerator partitionGenerator;
+ final SeedManager seedManager;
+ final Distribution partitionCount;
+ final RatioDistribution useRatio;
+ final Integer targetCount;
+
+ public DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, Integer targetCount)
+ {
+ this(partitionGenerator, seedManager, partitionCount, null, targetCount);
+ }
+ public DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution useRatio)
+ {
+ this(partitionGenerator, seedManager, partitionCount, useRatio, null);
+ }
+ private DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution useRatio, Integer targetCount)
+ {
+ this.partitionGenerator = partitionGenerator;
+ this.seedManager = seedManager;
+ this.partitionCount = partitionCount;
+ this.useRatio = useRatio;
+ this.targetCount = targetCount;
+ }
+ }
- public Operation(Timer timer, PartitionGenerator generator, StressSettings settings, Distribution partitionCount)
+ public Operation(Timer timer, StressSettings settings, DataSpec spec)
{
- this.generator = generator;
this.timer = timer;
this.settings = settings;
- this.partitionCount = partitionCount;
+ this.spec = spec;
}
public static interface RunOp
@@ -56,9 +81,42 @@ public abstract class Operation
public int rowCount();
}
- protected void setPartitions(List<Partition> partitions)
+ boolean ready(WorkManager permits, RateLimiter rateLimiter)
{
- this.partitions = partitions;
+ int partitionCount = (int) spec.partitionCount.next();
+ if (partitionCount <= 0)
+ return false;
+ partitionCount = permits.takePermits(partitionCount);
+ if (partitionCount <= 0)
+ return false;
+
+ int i = 0;
+ boolean success = true;
+ for (; i < partitionCount && success ; i++)
+ {
+ if (i >= partitionCache.size())
+ partitionCache.add(PartitionIterator.get(spec.partitionGenerator, spec.seedManager));
+
+ success = false;
+ while (!success)
+ {
+ Seed seed = spec.seedManager.next(this);
+ if (seed == null)
+ break;
+
+ if (spec.useRatio == null)
+ success = partitionCache.get(i).reset(seed, spec.targetCount, this);
+ else
+ success = partitionCache.get(i).reset(seed, spec.useRatio.next(), this);
+ }
+ }
+ partitionCount = i;
+
+ if (rateLimiter != null)
+ rateLimiter.acquire(partitionCount);
+
+ partitions = partitionCache.subList(0, partitionCount);
+ return !partitions.isEmpty();
}
public boolean isWrite()
@@ -135,7 +193,7 @@ public abstract class Operation
private String key()
{
List<String> keys = new ArrayList<>();
- for (Partition partition : partitions)
+ for (PartitionIterator partition : partitions)
keys.add(partition.getKeyAsString());
return keys.toString();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/StressAction.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
index 68e0004..1433742 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
@@ -21,19 +21,16 @@ import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.cassandra.stress.generate.Partition;
import org.apache.cassandra.stress.operations.OpDistribution;
import org.apache.cassandra.stress.operations.OpDistributionFactory;
-import org.apache.cassandra.stress.settings.*;
+import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.JavaDriverClient;
import org.apache.cassandra.stress.util.ThriftClient;
import org.apache.cassandra.stress.util.Timer;
@@ -180,9 +177,9 @@ public class StressAction implements Runnable
: "until stderr of mean < " + settings.command.targetUncertainty));
final WorkManager workManager;
if (opCount < 0)
- workManager = new ContinuousWorkManager();
+ workManager = new WorkManager.ContinuousWorkManager();
else
- workManager = new FixedWorkManager(opCount);
+ workManager = new WorkManager.FixedWorkManager(opCount);
final StressMetrics metrics = new StressMetrics(output, settings.log.intervalMillis, settings);
@@ -285,36 +282,12 @@ public class StressAction implements Runnable
throw new IllegalStateException();
}
- int maxBatchSize = operations.maxBatchSize();
- Partition[] partitions = new Partition[maxBatchSize];
while (true)
{
-
- // TODO: Operation should be able to ecapsulate much of this behaviour
Operation op = operations.next();
- op.generator.reset();
-
- int batchSize = workManager.takePermits(Math.max(1, (int) op.partitionCount.next()));
- if (batchSize < 0)
- break;
-
- if (rateLimiter != null)
- rateLimiter.acquire(batchSize);
-
- int partitionCount = 0;
- while (partitionCount < batchSize)
- {
- Partition p = op.generator.generate(op);
- if (p == null)
- break;
- partitions[partitionCount++] = p;
- }
-
- if (partitionCount == 0)
+ if (!op.ready(workManager, rateLimiter))
break;
- op.setPartitions(Arrays.asList(partitions).subList(0, partitionCount));
-
try
{
switch (settings.mode.api)
@@ -358,65 +331,4 @@ public class StressAction implements Runnable
}
- private interface WorkManager
- {
- // -1 indicates consumer should terminate
- int takePermits(int count);
-
- // signal all consumers to terminate
- void stop();
- }
-
- private static final class FixedWorkManager implements WorkManager
- {
-
- final AtomicLong permits;
-
- public FixedWorkManager(long permits)
- {
- this.permits = new AtomicLong(permits);
- }
-
- @Override
- public int takePermits(int count)
- {
- while (true)
- {
- long cur = permits.get();
- if (cur == 0)
- return -1;
- count = (int) Math.min(count, cur);
- long next = cur - count;
- if (permits.compareAndSet(cur, next))
- return count;
- }
- }
-
- @Override
- public void stop()
- {
- permits.getAndSet(0);
- }
- }
-
- private static final class ContinuousWorkManager implements WorkManager
- {
-
- volatile boolean stop = false;
-
- @Override
- public int takePermits(int count)
- {
- if (stop)
- return -1;
- return count;
- }
-
- @Override
- public void stop()
- {
- stop = true;
- }
-
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
index 3a4a4a3..d1cc0d4 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
@@ -31,7 +31,6 @@ import java.util.concurrent.ThreadFactory;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.stress.settings.SettingsLog;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.JmxCollector;
import org.apache.cassandra.stress.util.Timing;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
index 76642be..1517fcb 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
@@ -21,37 +21,26 @@
package org.apache.cassandra.stress;
-import com.datastax.driver.core.*;
-import com.datastax.driver.core.exceptions.AlreadyExistsException;
+import java.io.IOError;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Uninterruptibles;
+
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.exceptions.AlreadyExistsException;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.statements.CreateKeyspaceStatement;
import org.apache.cassandra.exceptions.RequestValidationException;
-
import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.stress.generate.Distribution;
-import org.apache.cassandra.stress.generate.DistributionFactory;
-import org.apache.cassandra.stress.generate.PartitionGenerator;
-import org.apache.cassandra.stress.generate.RatioDistributionFactory;
-import org.apache.cassandra.stress.generate.SeedManager;
-import org.apache.cassandra.stress.generate.values.Booleans;
-import org.apache.cassandra.stress.generate.values.Bytes;
-import org.apache.cassandra.stress.generate.values.Generator;
-import org.apache.cassandra.stress.generate.values.Dates;
-import org.apache.cassandra.stress.generate.values.Doubles;
-import org.apache.cassandra.stress.generate.values.Floats;
-import org.apache.cassandra.stress.generate.values.GeneratorConfig;
-import org.apache.cassandra.stress.generate.values.Inets;
-import org.apache.cassandra.stress.generate.values.Integers;
-import org.apache.cassandra.stress.generate.values.Lists;
-import org.apache.cassandra.stress.generate.values.Longs;
-import org.apache.cassandra.stress.generate.values.Sets;
-import org.apache.cassandra.stress.generate.values.Strings;
-import org.apache.cassandra.stress.generate.values.TimeUUIDs;
-import org.apache.cassandra.stress.generate.values.UUIDs;
+import org.apache.cassandra.stress.generate.*;
+import org.apache.cassandra.stress.generate.values.*;
import org.apache.cassandra.stress.operations.userdefined.SchemaInsert;
import org.apache.cassandra.stress.operations.userdefined.SchemaQuery;
import org.apache.cassandra.stress.settings.OptionDistribution;
@@ -68,19 +57,6 @@ import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.Constructor;
import org.yaml.snakeyaml.error.YAMLException;
-import java.io.*;
-import java.net.URI;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
public class StressProfile implements Serializable
{
private String keyspaceCql;
@@ -247,7 +223,7 @@ public class StressProfile implements Serializable
}
}
- public SchemaQuery getQuery(String name, Timer timer, PartitionGenerator generator, StressSettings settings)
+ public SchemaQuery getQuery(String name, Timer timer, PartitionGenerator generator, SeedManager seeds, StressSettings settings)
{
if (queryStatements == null)
{
@@ -286,10 +262,11 @@ public class StressProfile implements Serializable
name = name.toLowerCase();
if (!queryStatements.containsKey(name))
throw new IllegalArgumentException("No query defined with name " + name);
- return new SchemaQuery(timer, generator, settings, thriftQueryIds.get(name), queryStatements.get(name), ThriftConversion.fromThrift(settings.command.consistencyLevel), ValidationType.NOT_FAIL, argSelects.get(name));
+ return new SchemaQuery(timer, settings, generator, seeds, thriftQueryIds.get(name), queryStatements.get(name),
+ ThriftConversion.fromThrift(settings.command.consistencyLevel), ValidationType.NOT_FAIL, argSelects.get(name));
}
- public SchemaInsert getInsert(Timer timer, PartitionGenerator generator, StressSettings settings)
+ public SchemaInsert getInsert(Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings)
{
if (insertStatement == null)
{
@@ -401,7 +378,7 @@ public class StressProfile implements Serializable
}
}
- return new SchemaInsert(timer, generator, settings, partitions.get(), selectchance.get(), thriftInsertId, insertStatement, ThriftConversion.fromThrift(settings.command.consistencyLevel), batchType);
+ return new SchemaInsert(timer, settings, generator, seedManager, partitions.get(), selectchance.get(), thriftInsertId, insertStatement, ThriftConversion.fromThrift(settings.command.consistencyLevel), batchType);
}
private static <E> E select(E first, String key, String defValue, Map<String, String> map, Function<String, E> builder)
@@ -415,7 +392,7 @@ public class StressProfile implements Serializable
return builder.apply(defValue);
}
- public PartitionGenerator newGenerator(StressSettings settings, SeedManager seeds)
+ public PartitionGenerator newGenerator(StressSettings settings)
{
if (generatorFactory == null)
{
@@ -427,7 +404,7 @@ public class StressProfile implements Serializable
}
}
- return generatorFactory.newGenerator(settings, seeds);
+ return generatorFactory.newGenerator(settings);
}
private class GeneratorFactory
@@ -449,9 +426,9 @@ public class StressProfile implements Serializable
valueColumns.add(new ColumnInfo(metadata.getName(), metadata.getType(), columnConfigs.get(metadata.getName())));
}
- PartitionGenerator newGenerator(StressSettings settings, SeedManager seeds)
+ PartitionGenerator newGenerator(StressSettings settings)
{
- return new PartitionGenerator(get(partitionKeys), get(clusteringColumns), get(valueColumns), settings.generate.order, seeds);
+ return new PartitionGenerator(get(partitionKeys), get(clusteringColumns), get(valueColumns), settings.generate.order);
}
List<Generator> get(List<ColumnInfo> columnInfos)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/StressServer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressServer.java b/tools/stress/src/org/apache/cassandra/stress/StressServer.java
index 3c9e2a6..a6dfaf4 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressServer.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressServer.java
@@ -24,9 +24,10 @@ import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
-import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.commons.cli.*;
+import org.apache.cassandra.stress.settings.StressSettings;
+
public class StressServer
{
private static final Options availableOptions = new Options();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/generate/Partition.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/Partition.java b/tools/stress/src/org/apache/cassandra/stress/generate/Partition.java
deleted file mode 100644
index 66f8c1d..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/generate/Partition.java
+++ /dev/null
@@ -1,554 +0,0 @@
-package org.apache.cassandra.stress.generate;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
-import java.util.Random;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.stress.generate.values.Generator;
-
-// a partition is re-used to reduce garbage generation, as is its internal RowIterator
-// TODO: we should batch the generation of clustering components so we can bound the time and size necessary to
-// generate huge partitions with only a small number of clustering components; i.e. we should generate seeds for batches
-// of a single component, and then generate the values within those batches as necessary. this will be difficult with
-// generating sorted partitions, and may require generator support (e.g. we may need to support generating prefixes
-// that are extended/suffixed to generate each batch, so that we can sort the prefixes)
-public class Partition
-{
-
- private long idseed;
- private Seed seed;
- private final Object[] partitionKey;
- private final PartitionGenerator generator;
- private final RowIterator iterator;
-
- public Partition(PartitionGenerator generator)
- {
- this.generator = generator;
- this.partitionKey = new Object[generator.partitionKey.size()];
- if (generator.clusteringComponents.size() > 0)
- iterator = new MultiRowIterator();
- else
- iterator = new SingleRowIterator();
- }
-
- void setSeed(Seed seed)
- {
- long idseed = 0;
- for (int i = 0 ; i < partitionKey.length ; i++)
- {
- Generator generator = this.generator.partitionKey.get(i);
- // set the partition key seed based on the current work item we're processing
- generator.setSeed(seed.seed);
- Object key = generator.generate();
- partitionKey[i] = key;
- // then contribute this value to the data seed
- idseed = seed(key, generator.type, idseed);
- }
- this.seed = seed;
- this.idseed = idseed;
- }
-
- public RowIterator iterator(double useChance, boolean isWrite)
- {
- iterator.reset(useChance, 0, 1, isWrite);
- return iterator;
- }
-
- public RowIterator iterator(int targetCount, boolean isWrite)
- {
- iterator.reset(Double.NaN, targetCount, 1, isWrite);
- return iterator;
- }
-
- class SingleRowIterator extends RowIterator
- {
- boolean done;
-
- void reset(double useChance, int targetCount, int batches, boolean isWrite)
- {
- done = false;
- }
-
- public Iterable<Row> next()
- {
- if (done)
- return Collections.emptyList();
- for (int i = 0 ; i < row.row.length ; i++)
- {
- Generator gen = generator.valueComponents.get(i);
- gen.setSeed(idseed);
- row.row[i] = gen.generate();
- }
- done = true;
- return Collections.singleton(row);
- }
-
- public boolean done()
- {
- return done;
- }
-
- public void markWriteFinished()
- {
- assert done;
- generator.seeds.markFinished(seed);
- }
- }
-
- public abstract class RowIterator
- {
- // we reuse the row object to save garbage
- final Row row = new Row(partitionKey, new Object[generator.clusteringComponents.size() + generator.valueComponents.size()]);
-
- public abstract Iterable<Row> next();
- public abstract boolean done();
- public abstract void markWriteFinished();
- abstract void reset(double useChance, int targetCount, int batches, boolean isWrite);
-
- public Partition partition()
- {
- return Partition.this;
- }
- }
-
- // permits iterating a random subset of the procedurally generated rows in this partition. this is the only mechanism for visiting rows.
- // we maintain a stack of clustering components and their seeds; for each clustering component we visit, we generate all values it takes at that level,
- // and then, using the average (total) number of children it takes we randomly choose whether or not we visit its children;
- // if we do, we generate all possible values the immediate children can take, and repeat the process. So at any one time we are using space proportional
- // to C.N, where N is the average number of values each clustering component takes, as opposed to N^C total values in the partition.
- // TODO : guarantee at least one row is always returned
- // TODO : support first/last row, and constraining reads to rows we know are populated
- class MultiRowIterator extends RowIterator
- {
-
- // probability any single row will be generated in this iteration
- double useChance;
-
- // the seed used to generate the current values for the clustering components at each depth;
- // used to save recalculating it for each row, so we only need to recalc from prior row.
- final long[] clusteringSeeds = new long[generator.clusteringComponents.size()];
- // the components remaining to be visited for each level of the current stack
- final Deque<Object>[] clusteringComponents = new ArrayDeque[generator.clusteringComponents.size()];
-
- // we want our chance of selection to be applied uniformly, so we compound the roll we make at each level
- // so that we know with what chance we reached there, and we adjust our roll at that level by that amount
- final double[] chancemodifier = new double[generator.clusteringComponents.size()];
- final double[] rollmodifier = new double[generator.clusteringComponents.size()];
-
- // track where in the partition we are, and where we are limited to
- final int[] position = new int[generator.clusteringComponents.size()];
- final int[] limit = new int[position.length];
- int batchSize;
- boolean returnedOne;
- boolean forceReturnOne;
-
- // reusable collections for generating unique and sorted clustering components
- final Set<Object> unique = new HashSet<>();
- final List<Comparable> tosort = new ArrayList<>();
- final Random random = new Random();
-
- MultiRowIterator()
- {
- for (int i = 0 ; i < clusteringComponents.length ; i++)
- clusteringComponents[i] = new ArrayDeque<>();
- rollmodifier[0] = 1f;
- chancemodifier[0] = generator.clusteringChildAverages[0];
- }
-
- // if we're a write, the expected behaviour is that the requested batch count is compounded with the seed's visit
- // count to decide how much we should return in one iteration
- void reset(double useChance, int targetCount, int batches, boolean isWrite)
- {
- if (this.useChance < 1d)
- {
- // we clear our prior roll-modifiers if the use chance was previously less-than zero
- Arrays.fill(rollmodifier, 1d);
- Arrays.fill(chancemodifier, 1d);
- }
-
- // set the seed for the first clustering component
- generator.clusteringComponents.get(0).setSeed(idseed);
- int[] position = seed.position;
-
- // calculate how many first clustering components we'll generate, and how many total rows this predicts
- int firstComponentCount = (int) generator.clusteringComponents.get(0).clusteringDistribution.next();
- int expectedRowCount;
-
- if (!isWrite && position != null)
- {
- expectedRowCount = 0;
- for (int i = 0 ; i < position.length ; i++)
- {
- expectedRowCount += position[i] * generator.clusteringChildAverages[i];
- limit[i] = position[i];
- }
- }
- else
- {
- expectedRowCount = firstComponentCount * generator.clusteringChildAverages[0];
- if (isWrite)
- batches *= seed.visits;
- Arrays.fill(limit, Integer.MAX_VALUE);
- }
-
- batchSize = Math.max(1, expectedRowCount / batches);
- if (Double.isNaN(useChance))
- useChance = Math.max(0d, Math.min(1d, targetCount / (double) expectedRowCount));
-
- // clear any remnants of the last iteration, wire up our constants, and fill in the first clustering components
- this.useChance = useChance;
- this.returnedOne = false;
- for (Queue<?> q : clusteringComponents)
- q.clear();
- clusteringSeeds[0] = idseed;
- fill(clusteringComponents[0], firstComponentCount, generator.clusteringComponents.get(0));
-
- // seek to our start position
- seek(isWrite ? position : null);
- }
-
- // generate the clustering components for the provided depth; requires preceding components
- // to have been generated and their seeds populated into clusteringSeeds
- void fill(int depth)
- {
- long seed = clusteringSeeds[depth - 1];
- Generator gen = generator.clusteringComponents.get(depth);
- gen.setSeed(seed);
- clusteringSeeds[depth] = seed(clusteringComponents[depth - 1].peek(), generator.clusteringComponents.get(depth - 1).type, seed);
- fill(clusteringComponents[depth], (int) gen.clusteringDistribution.next(), gen);
- }
-
- // generate the clustering components into the queue
- void fill(Queue<Object> queue, int count, Generator generator)
- {
- if (count == 1)
- {
- queue.add(generator.generate());
- return;
- }
-
- switch (Partition.this.generator.order)
- {
- case SORTED:
- if (Comparable.class.isAssignableFrom(generator.clazz))
- {
- tosort.clear();
- for (int i = 0 ; i < count ; i++)
- tosort.add((Comparable) generator.generate());
- Collections.sort(tosort);
- for (int i = 0 ; i < count ; i++)
- queue.add(tosort.get(i));
- break;
- }
- else
- {
- throw new RuntimeException("Generator class is not comparable: "+generator.clazz);
- }
- case ARBITRARY:
- unique.clear();
- for (int i = 0 ; i < count ; i++)
- {
- Object next = generator.generate();
- if (unique.add(next))
- queue.add(next);
- }
- break;
- case SHUFFLED:
- unique.clear();
- tosort.clear();
- for (int i = 0 ; i < count ; i++)
- {
- Object next = generator.generate();
- if (unique.add(next))
- tosort.add(new RandomOrder(next));
- }
- Collections.sort(tosort);
- for (Object o : tosort)
- queue.add(((RandomOrder)o).value);
- break;
- default:
- throw new IllegalStateException();
- }
- }
-
- // seek to the provided position (or the first entry if null)
- private void seek(int[] position)
- {
- if (position == null)
- {
- this.position[0] = -1;
- clusteringComponents[0].addFirst(this);
- advance(0);
- return;
- }
-
- assert position.length == clusteringComponents.length;
- for (int i = 0 ; i < position.length ; i++)
- {
- if (i != 0)
- fill(i);
- for (int c = position[i] ; c > 0 ; c--)
- clusteringComponents[i].poll();
- row.row[i] = clusteringComponents[i].peek();
- }
- System.arraycopy(position, 0, this.position, 0, position.length);
- }
-
- // normal method for moving the iterator forward; maintains the row object, and delegates to advance(int)
- // to move the iterator to the next item
- void advance()
- {
- // we are always at the leaf level when this method is invoked
- // so we calculate the seed for generating the row by combining the seed that generated the clustering components
- int depth = clusteringComponents.length - 1;
- long parentSeed = clusteringSeeds[depth];
- long rowSeed = seed(clusteringComponents[depth].peek(), generator.clusteringComponents.get(depth).type, parentSeed);
-
- // and then fill the row with the _non-clustering_ values for the position we _were_ at, as this is what we'll deliver
- for (int i = clusteringSeeds.length ; i < row.row.length ; i++)
- {
- Generator gen = generator.valueComponents.get(i - clusteringSeeds.length);
- gen.setSeed(rowSeed);
- row.row[i] = gen.generate();
- }
- returnedOne = true;
- forceReturnOne = false;
-
- // then we advance the leaf level
- advance(depth);
- }
-
- private void advance(int depth)
- {
- // advance the leaf component
- clusteringComponents[depth].poll();
- position[depth]++;
- while (true)
- {
- if (clusteringComponents[depth].isEmpty())
- {
- // if we've run out of clustering components at this level, ascend
- if (depth == 0)
- return;
- depth--;
- clusteringComponents[depth].poll();
- position[depth]++;
- continue;
- }
-
- if (depth == 0 && !returnedOne && clusteringComponents[0].size() == 1)
- forceReturnOne = true;
-
- // the chance of descending is the uniform usechance, multiplied by the number of children
- // we would on average generate (so if we have a 0.1 use chance, but should generate 10 children
- // then we will always descend), multiplied by 1/(compound roll), where (compound roll) is the
- // chance with which we reached this depth, i.e. if we already beat 50/50 odds, we double our
- // chance of beating this next roll
- double thischance = useChance * chancemodifier[depth];
- if (forceReturnOne || thischance > 0.999f || thischance >= random.nextDouble())
- {
- // if we're descending, we fill in our clustering component and increase our depth
- row.row[depth] = clusteringComponents[depth].peek();
- depth++;
- if (depth == clusteringComponents.length)
- break;
- // if we haven't reached the leaf, we update our probability statistics, fill in all of
- // this level's clustering components, and repeat
- if (useChance < 1d)
- {
- rollmodifier[depth] = rollmodifier[depth - 1] / Math.min(1d, thischance);
- chancemodifier[depth] = generator.clusteringChildAverages[depth] * rollmodifier[depth];
- }
- position[depth] = 0;
- fill(depth);
- continue;
- }
-
- // if we don't descend, we remove the clustering suffix we've skipped and continue
- clusteringComponents[depth].poll();
- position[depth]++;
- }
- }
-
- public Iterable<Row> next()
- {
- final int[] limit = position.clone();
- int remainingSize = batchSize;
- for (int i = 0 ; i < limit.length && remainingSize > 0 ; i++)
- {
- limit[i] += remainingSize / generator.clusteringChildAverages[i];
- remainingSize %= generator.clusteringChildAverages[i];
- }
- assert remainingSize == 0;
- for (int i = limit.length - 1 ; i > 0 ; i--)
- {
- if (limit[i] > generator.clusteringChildAverages[i])
- {
- limit[i - 1] += limit[i] / generator.clusteringChildAverages[i];
- limit[i] %= generator.clusteringChildAverages[i];
- }
- }
- for (int i = 0 ; i < limit.length ; i++)
- {
- if (limit[i] < this.limit[i])
- break;
- limit[i] = Math.min(limit[i], this.limit[i]);
- }
- return new Iterable<Row>()
- {
- public Iterator<Row> iterator()
- {
- return new Iterator<Row>()
- {
-
- public boolean hasNext()
- {
- if (done())
- return false;
- for (int i = 0 ; i < position.length ; i++)
- if (position[i] < limit[i])
- return true;
- return false;
- }
-
- public Row next()
- {
- advance();
- return row;
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- };
- }
- };
- }
-
- public boolean done()
- {
- return clusteringComponents[0].isEmpty();
- }
-
- public void markWriteFinished()
- {
- if (done())
- generator.seeds.markFinished(seed);
- else
- generator.seeds.markVisited(seed, position.clone());
- }
-
- public Partition partition()
- {
- return Partition.this;
- }
- }
-
- private static class RandomOrder implements Comparable<RandomOrder>
- {
- final int order = ThreadLocalRandom.current().nextInt();
- final Object value;
- private RandomOrder(Object value)
- {
- this.value = value;
- }
-
- public int compareTo(RandomOrder that)
- {
- return Integer.compare(this.order, that.order);
- }
- }
-
- // calculate a new seed based on the combination of a parent seed and the generated child, to generate
- // any children of this child
- static long seed(Object object, AbstractType type, long seed)
- {
- if (object instanceof ByteBuffer)
- {
- ByteBuffer buf = (ByteBuffer) object;
- for (int i = buf.position() ; i < buf.limit() ; i++)
- seed = (31 * seed) + buf.get(i);
- return seed;
- }
- else if (object instanceof String)
- {
- String str = (String) object;
- for (int i = 0 ; i < str.length() ; i++)
- seed = (31 * seed) + str.charAt(i);
- return seed;
- }
- else if (object instanceof Number)
- {
- return (seed * 31) + ((Number) object).longValue();
- }
- else if (object instanceof UUID)
- {
- return seed * 31 + (((UUID) object).getLeastSignificantBits() ^ ((UUID) object).getMostSignificantBits());
- }
- else
- {
- return seed(type.decompose(object), BytesType.instance, seed);
- }
- }
-
- public Object getPartitionKey(int i)
- {
- return partitionKey[i];
- }
-
- public String getKeyAsString()
- {
- StringBuilder sb = new StringBuilder();
- int i = 0;
- for (Object key : partitionKey)
- {
- if (i > 0)
- sb.append("|");
- AbstractType type = generator.partitionKey.get(i++).type;
- sb.append(type.getString(type.decompose(key)));
- }
- return sb.toString();
- }
-
- // used for thrift smart routing - if it's a multi-part key we don't try to route correctly right now
- public ByteBuffer getToken()
- {
- return generator.partitionKey.get(0).type.decompose(partitionKey[0]);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
index 128d2f5..9f88068 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
@@ -22,7 +22,6 @@ package org.apache.cassandra.stress.generate;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -30,7 +29,6 @@ import java.util.NoSuchElementException;
import com.google.common.collect.Iterables;
-import org.apache.cassandra.stress.Operation;
import org.apache.cassandra.stress.generate.values.Generator;
public class PartitionGenerator
@@ -46,30 +44,24 @@ public class PartitionGenerator
final List<Generator> partitionKey;
final List<Generator> clusteringComponents;
final List<Generator> valueComponents;
- final int[] clusteringChildAverages;
+ final int[] clusteringDescendantAverages;
+ final int[] clusteringComponentAverages;
private final Map<String, Integer> indexMap;
final Order order;
- final SeedManager seeds;
- final List<Partition> recyclable = new ArrayList<>();
- int partitionsInUse = 0;
-
- public void reset()
- {
- partitionsInUse = 0;
- }
-
- public PartitionGenerator(List<Generator> partitionKey, List<Generator> clusteringComponents, List<Generator> valueComponents, Order order, SeedManager seeds)
+ public PartitionGenerator(List<Generator> partitionKey, List<Generator> clusteringComponents, List<Generator> valueComponents, Order order)
{
this.partitionKey = partitionKey;
this.clusteringComponents = clusteringComponents;
this.valueComponents = valueComponents;
this.order = order;
- this.seeds = seeds;
- this.clusteringChildAverages = new int[clusteringComponents.size()];
- for (int i = clusteringChildAverages.length - 1 ; i >= 0 ; i--)
- clusteringChildAverages[i] = (int) (i < (clusteringChildAverages.length - 1) ? clusteringComponents.get(i + 1).clusteringDistribution.average() * clusteringChildAverages[i + 1] : 1);
+ this.clusteringDescendantAverages = new int[clusteringComponents.size()];
+ this.clusteringComponentAverages = new int[clusteringComponents.size()];
+ for (int i = 0 ; i < clusteringComponentAverages.length ; i++)
+ clusteringComponentAverages[i] = (int) clusteringComponents.get(i).clusteringDistribution.average();
+ for (int i = clusteringDescendantAverages.length - 1 ; i >= 0 ; i--)
+ clusteringDescendantAverages[i] = (int) (i < (clusteringDescendantAverages.length - 1) ? clusteringComponentAverages[i + 1] * clusteringDescendantAverages[i + 1] : 1);
double maxRowCount = 1d;
double minRowCount = 1d;
for (Generator component : clusteringComponents)
@@ -101,19 +93,6 @@ public class PartitionGenerator
return i;
}
- public Partition generate(Operation op)
- {
- if (recyclable.size() <= partitionsInUse || recyclable.get(partitionsInUse) == null)
- recyclable.add(new Partition(this));
-
- Seed seed = seeds.next(op);
- if (seed == null)
- return null;
- Partition partition = recyclable.get(partitionsInUse++);
- partition.setSeed(seed);
- return partition;
- }
-
public ByteBuffer convert(int c, Object v)
{
if (c < 0)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java b/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java
index f427608..9e2e65b 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java
@@ -18,50 +18,68 @@
*/
package org.apache.cassandra.stress.generate;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.cassandra.stress.util.DynamicList;
public class Seed implements Comparable<Seed>
{
+ public final int visits;
public final long seed;
- final int visits;
- DynamicList.Node poolNode;
- volatile int[] position;
- volatile State state = State.HELD;
+ private volatile DynamicList.Node poolNode;
+ private volatile int position;
- private static final AtomicReferenceFieldUpdater<Seed, Seed.State> stateUpdater = AtomicReferenceFieldUpdater.newUpdater(Seed.class, State.class, "state");
+ private static final AtomicIntegerFieldUpdater<Seed> positionUpdater = AtomicIntegerFieldUpdater.newUpdater(Seed.class, "position");
public int compareTo(Seed that)
{
return Long.compare(this.seed, that.seed);
}
- static enum State
- {
- HELD, AVAILABLE
- }
-
Seed(long seed, int visits)
{
this.seed = seed;
this.visits = visits;
}
- boolean take()
+ public int position()
{
- return stateUpdater.compareAndSet(this, State.AVAILABLE, State.HELD);
+ return position;
}
- void yield()
+ public int moveForwards(int rowCount)
{
- state = State.AVAILABLE;
+ return positionUpdater.getAndAdd(this, rowCount);
}
- public int[] position()
+ public int hashCode()
{
- return position;
+ return (int) seed;
+ }
+
+ public boolean equals(Object that)
+ {
+ return that instanceof Seed && this.seed == ((Seed) that).seed;
+ }
+
+ public boolean save(DynamicList<Seed> sampleFrom, int maxSize)
+ {
+ DynamicList.Node poolNode = sampleFrom.append(this, maxSize);
+ if (poolNode == null)
+ return false;
+ this.poolNode = poolNode;
+ return true;
+ }
+
+ public boolean isSaved()
+ {
+ return poolNode != null;
+ }
+
+ public void remove(DynamicList<Seed> sampleFrom)
+ {
+ sampleFrom.remove(poolNode);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/generate/SeedManager.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/SeedManager.java b/tools/stress/src/org/apache/cassandra/stress/generate/SeedManager.java
index dba721d..071d888 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/SeedManager.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/SeedManager.java
@@ -33,9 +33,12 @@ public class SeedManager
final Distribution visits;
final Generator writes;
final Generator reads;
- final ConcurrentHashMap<Seed, Seed> managing = new ConcurrentHashMap<>();
+ final ConcurrentHashMap<Long, Seed> managing = new ConcurrentHashMap<>();
final DynamicList<Seed> sampleFrom;
final Distribution sample;
+ final long sampleOffset;
+ final int sampleSize;
+ final boolean updateSampleImmediately;
public SeedManager(StressSettings settings)
{
@@ -61,10 +64,15 @@ public class SeedManager
this.visits = settings.insert.visits.get();
this.writes = writes;
this.reads = reads;
- this.sample = DistributionInverted.invert(settings.insert.revisit.get());
- if (sample.maxValue() > Integer.MAX_VALUE || sample.minValue() < 0)
- throw new IllegalArgumentException();
- this.sampleFrom = new DynamicList<>((int) sample.maxValue());
+ Distribution sample = settings.insert.revisit.get();
+ this.sampleOffset = Math.min(sample.minValue(), sample.maxValue());
+ long sampleSize = 1 + Math.max(sample.minValue(), sample.maxValue()) - sampleOffset;
+ if (sampleOffset < 0 || sampleSize > Integer.MAX_VALUE)
+ throw new IllegalArgumentException("sample range is invalid");
+ this.sampleFrom = new DynamicList<>((int) sampleSize);
+ this.sample = DistributionInverted.invert(sample);
+ this.sampleSize = (int) sampleSize;
+ this.updateSampleImmediately = visits.average() > 1;
}
public Seed next(Operation op)
@@ -80,48 +88,38 @@ public class SeedManager
while (true)
{
- int index = (int) sample.next();
+ int index = (int) (sample.next() - sampleOffset);
Seed seed = sampleFrom.get(index);
- if (seed != null && seed.take())
+ if (seed != null && seed.isSaved())
return seed;
seed = writes.next((int) visits.next());
if (seed == null)
return null;
- // seeds are created HELD, so if we insert it successfully we have it exclusively for our write
- if (managing.putIfAbsent(seed, seed) == null)
- return seed;
+ if (managing.putIfAbsent(seed.seed, seed) == null)
+ {
+ if (!updateSampleImmediately || seed.save(sampleFrom, sampleSize))
+ return seed;
+ managing.remove(seed.seed, seed);
+ }
}
}
- public void markVisited(Seed seed, int[] position)
- {
- boolean first = seed.position == null;
- seed.position = position;
- finishedWriting(seed, first, false);
- }
-
- public void markFinished(Seed seed)
+ public void markLastWrite(Seed seed, boolean first)
{
- finishedWriting(seed, seed.position == null, true);
+ // we could have multiple iterators mark the last write simultaneously,
+ // so we ensure we remove conditionally, and only remove the exact seed we were operating over
+ // this is important because, to ensure correctness, we do not support calling remove multiple
+ // times on the same DynamicList.Node
+ if (managing.remove(seed.seed, seed) && !first)
+ seed.remove(sampleFrom);
}
- void finishedWriting(Seed seed, boolean first, boolean completed)
+ public void markFirstWrite(Seed seed, boolean last)
{
- if (!completed)
- {
- if (first)
- seed.poolNode = sampleFrom.append(seed);
- seed.yield();
- }
- else
- {
- if (!first)
- sampleFrom.remove(seed.poolNode);
- managing.remove(seed);
- }
- if (first)
- writes.finishWrite(seed);
+ if (!last && !updateSampleImmediately)
+ seed.save(sampleFrom, Integer.MAX_VALUE);
+ writes.finishWrite(seed);
}
private abstract class Generator
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/generate/values/Bytes.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/values/Bytes.java b/tools/stress/src/org/apache/cassandra/stress/generate/values/Bytes.java
index 358163c..3c15c87 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/values/Bytes.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/values/Bytes.java
@@ -20,12 +20,11 @@
*/
package org.apache.cassandra.stress.generate.values;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.stress.generate.FasterRandom;
-
import java.nio.ByteBuffer;
import java.util.Arrays;
-import java.util.Random;
+
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.stress.generate.FasterRandom;
public class Bytes extends Generator<ByteBuffer>
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/generate/values/GeneratorConfig.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/values/GeneratorConfig.java b/tools/stress/src/org/apache/cassandra/stress/generate/values/GeneratorConfig.java
index 8f7b2ea..3522338 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/values/GeneratorConfig.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/values/GeneratorConfig.java
@@ -20,17 +20,14 @@
*/
package org.apache.cassandra.stress.generate.values;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
import org.apache.cassandra.stress.generate.Distribution;
import org.apache.cassandra.stress.generate.DistributionFactory;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.MurmurHash;
-
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Map;
-
public class GeneratorConfig implements Serializable
{
public final long salt;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/generate/values/Strings.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/values/Strings.java b/tools/stress/src/org/apache/cassandra/stress/generate/values/Strings.java
index 71aaae6..b58fee2 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/values/Strings.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/values/Strings.java
@@ -20,8 +20,6 @@
*/
package org.apache.cassandra.stress.generate.values;
-import java.util.Random;
-
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.stress.generate.FasterRandom;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/generate/values/TimeUUIDs.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/values/TimeUUIDs.java b/tools/stress/src/org/apache/cassandra/stress/generate/values/TimeUUIDs.java
index efe4b79..7bfabf5 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/values/TimeUUIDs.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/values/TimeUUIDs.java
@@ -21,11 +21,11 @@
package org.apache.cassandra.stress.generate.values;
+import java.util.UUID;
+
import org.apache.cassandra.db.marshal.TimeUUIDType;
import org.apache.cassandra.utils.UUIDGen;
-import java.util.UUID;
-
public class TimeUUIDs extends Generator<UUID>
{
final Dates dateGen;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java b/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
index 3212795..533b630 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
@@ -25,7 +25,6 @@ import org.apache.cassandra.stress.Operation;
public class FixedOpDistribution implements OpDistribution
{
-
final Operation operation;
public FixedOpDistribution(Operation operation)
@@ -37,10 +36,4 @@ public class FixedOpDistribution implements OpDistribution
{
return operation;
}
-
- public int maxBatchSize()
- {
- return (int) operation.partitionCount.maxValue();
- }
-
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
index bcbd0bf..0fc15a6 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
@@ -27,6 +27,5 @@ public interface OpDistribution
{
Operation next();
- public int maxBatchSize();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
index 0bd64c5..432e991 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
@@ -22,7 +22,6 @@ package org.apache.cassandra.stress.operations;
import org.apache.commons.math3.distribution.EnumeratedDistribution;
-import org.apache.commons.math3.util.Pair;
import org.apache.cassandra.stress.Operation;
import org.apache.cassandra.stress.generate.Distribution;
@@ -41,14 +40,6 @@ public class SampledOpDistribution implements OpDistribution
this.clustering = clustering;
}
- public int maxBatchSize()
- {
- int max = 1;
- for (Pair<Operation, Double> pair : operations.getPmf())
- max = Math.max(max, (int) pair.getFirst().partitionCount.maxValue());
- return max;
- }
-
public Operation next()
{
while (remaining == 0)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java
index b7d1ee7..456c821 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java
@@ -28,6 +28,7 @@ import java.util.List;
import org.apache.cassandra.stress.generate.Distribution;
import org.apache.cassandra.stress.generate.DistributionFactory;
import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.SeedManager;
import org.apache.cassandra.stress.settings.Command;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.Timer;
@@ -36,9 +37,9 @@ public class CqlCounterAdder extends CqlOperation<Integer>
{
final Distribution counteradd;
- public CqlCounterAdder(DistributionFactory counteradd, Timer timer, PartitionGenerator generator, StressSettings settings)
+ public CqlCounterAdder(DistributionFactory counteradd, Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings)
{
- super(Command.COUNTER_WRITE, timer, generator, settings);
+ super(Command.COUNTER_WRITE, timer, generator, seedManager, settings);
this.counteradd = counteradd.get();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java
index 94c8faf..8c1c65c 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java
@@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.List;
import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.SeedManager;
import org.apache.cassandra.stress.settings.Command;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.Timer;
@@ -33,9 +34,9 @@ import org.apache.cassandra.stress.util.Timer;
public class CqlCounterGetter extends CqlOperation<Integer>
{
- public CqlCounterGetter(Timer timer, PartitionGenerator generator, StressSettings settings)
+ public CqlCounterGetter(Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings)
{
- super(Command.COUNTER_READ, timer, generator, settings);
+ super(Command.COUNTER_READ, timer, generator, seedManager, settings);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java
index 622eb14..88ee752 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java
@@ -26,17 +26,17 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.SeedManager;
import org.apache.cassandra.stress.settings.Command;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.Timer;
-import org.apache.cassandra.utils.UUIDGen;
public class CqlInserter extends CqlOperation<Integer>
{
- public CqlInserter(Timer timer, PartitionGenerator generator, StressSettings settings)
+ public CqlInserter(Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings)
{
- super(Command.WRITE, timer, generator, settings);
+ super(Command.WRITE, timer, generator, seedManager, settings);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
index 0264cd1..9cea854 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
@@ -24,13 +24,13 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import com.google.common.base.Function;
+
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
-import com.google.common.base.Function;
-import org.apache.cassandra.stress.Operation;
-import org.apache.cassandra.stress.StressMetrics;
import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.SeedManager;
import org.apache.cassandra.stress.settings.Command;
import org.apache.cassandra.stress.settings.ConnectionStyle;
import org.apache.cassandra.stress.settings.CqlVersion;
@@ -54,9 +54,9 @@ public abstract class CqlOperation<V> extends PredefinedOperation
protected abstract String buildQuery();
protected abstract CqlRunOp<V> buildRunOp(ClientWrapper client, String query, Object queryId, List<Object> params, ByteBuffer key);
- public CqlOperation(Command type, Timer timer, PartitionGenerator generator, StressSettings settings)
+ public CqlOperation(Command type, Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings)
{
- super(type, timer, generator, settings);
+ super(type, timer, generator, seedManager, settings);
if (settings.columns.variableColumnCount)
throw new IllegalStateException("Variable column counts are not implemented for CQL");
}
@@ -168,28 +168,6 @@ public abstract class CqlOperation<V> extends PredefinedOperation
}
}
- // Requires a custom validate() method, but fetches and stores the keys from the result set for further processing
- protected abstract class CqlRunOpFetchKeys extends CqlRunOp<byte[][]>
- {
-
- protected CqlRunOpFetchKeys(ClientWrapper client, String query, Object queryId, List<Object> params, ByteBuffer key)
- {
- super(client, query, queryId, KeysHandler.INSTANCE, params, key);
- }
-
- @Override
- public int partitionCount()
- {
- return result.length;
- }
-
- @Override
- public int rowCount()
- {
- return result.length;
- }
- }
-
protected final class CqlRunOpMatchResults extends CqlRunOp<ByteBuffer[][]>
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java
index 3a7f75a..12cc241 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java
@@ -22,23 +22,22 @@ package org.apache.cassandra.stress.operations.predefined;
import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.SeedManager;
import org.apache.cassandra.stress.settings.Command;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.Timer;
-import org.apache.cassandra.utils.ByteBufferUtil;
public class CqlReader extends CqlOperation<ByteBuffer[][]>
{
- public CqlReader(Timer timer, PartitionGenerator generator, StressSettings settings)
+ public CqlReader(Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings)
{
- super(Command.READ, timer, generator, settings);
+ super(Command.READ, timer, generator, seedManager, settings);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
index dba2e51..d5c3edc 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
@@ -20,26 +20,17 @@ package org.apache.cassandra.stress.operations.predefined;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.EnumMap;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.cassandra.stress.Operation;
-import org.apache.cassandra.stress.StressMetrics;
-import org.apache.cassandra.stress.generate.Distribution;
-import org.apache.cassandra.stress.generate.DistributionFactory;
-import org.apache.cassandra.stress.generate.DistributionFixed;
-import org.apache.cassandra.stress.generate.PartitionGenerator;
-import org.apache.cassandra.stress.generate.Row;
+import org.apache.cassandra.stress.generate.*;
import org.apache.cassandra.stress.settings.Command;
import org.apache.cassandra.stress.settings.CqlVersion;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.Timer;
-import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
-import org.apache.cassandra.utils.ByteBufferUtil;
public abstract class PredefinedOperation extends Operation
{
@@ -47,13 +38,18 @@ public abstract class PredefinedOperation extends Operation
private final Distribution columnCount;
private Object cqlCache;
- public PredefinedOperation(Command type, Timer timer, PartitionGenerator generator, StressSettings settings)
+ public PredefinedOperation(Command type, Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings)
{
- super(timer, generator, settings, new DistributionFixed(1));
+ super(timer, settings, spec(generator, seedManager));
this.type = type;
this.columnCount = settings.columns.countDistribution.get();
}
+ private static DataSpec spec(PartitionGenerator generator, SeedManager seedManager)
+ {
+ return new DataSpec(generator, seedManager, new DistributionFixed(1), 1);
+ }
+
public boolean isCql3()
{
return settings.mode.cqlVersion == CqlVersion.CQL3;
@@ -174,7 +170,7 @@ public abstract class PredefinedOperation extends Operation
protected List<ByteBuffer> getColumnValues(ColumnSelection columns)
{
- Row row = partitions.get(0).iterator(1, false).next().iterator().next();
+ Row row = partitions.get(0).next();
ByteBuffer[] r = new ByteBuffer[columns.count()];
int c = 0;
if (columns.indices != null)
@@ -186,7 +182,7 @@ public abstract class PredefinedOperation extends Operation
return Arrays.asList(r);
}
- public static Operation operation(Command type, Timer timer, PartitionGenerator generator, StressSettings settings, DistributionFactory counteradd)
+ public static Operation operation(Command type, Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings, DistributionFactory counteradd)
{
switch (type)
{
@@ -194,10 +190,10 @@ public abstract class PredefinedOperation extends Operation
switch(settings.mode.style)
{
case THRIFT:
- return new ThriftReader(timer, generator, settings);
+ return new ThriftReader(timer, generator, seedManager, settings);
case CQL:
case CQL_PREPARED:
- return new CqlReader(timer, generator, settings);
+ return new CqlReader(timer, generator, seedManager, settings);
default:
throw new UnsupportedOperationException();
}
@@ -207,10 +203,10 @@ public abstract class PredefinedOperation extends Operation
switch(settings.mode.style)
{
case THRIFT:
- return new ThriftCounterGetter(timer, generator, settings);
+ return new ThriftCounterGetter(timer, generator, seedManager, settings);
case CQL:
case CQL_PREPARED:
- return new CqlCounterGetter(timer, generator, settings);
+ return new CqlCounterGetter(timer, generator, seedManager, settings);
default:
throw new UnsupportedOperationException();
}
@@ -220,10 +216,10 @@ public abstract class PredefinedOperation extends Operation
switch(settings.mode.style)
{
case THRIFT:
- return new ThriftInserter(timer, generator, settings);
+ return new ThriftInserter(timer, generator, seedManager, settings);
case CQL:
case CQL_PREPARED:
- return new CqlInserter(timer, generator, settings);
+ return new CqlInserter(timer, generator, seedManager, settings);
default:
throw new UnsupportedOperationException();
}
@@ -232,10 +228,10 @@ public abstract class PredefinedOperation extends Operation
switch(settings.mode.style)
{
case THRIFT:
- return new ThriftCounterAdder(counteradd, timer, generator, settings);
+ return new ThriftCounterAdder(counteradd, timer, generator, seedManager, settings);
case CQL:
case CQL_PREPARED:
- return new CqlCounterAdder(counteradd, timer, generator, settings);
+ return new CqlCounterAdder(counteradd, timer, generator, seedManager, settings);
default:
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java
index 4ee42e9..be34a07 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java
@@ -27,19 +27,22 @@ import java.util.Map;
import org.apache.cassandra.stress.generate.Distribution;
import org.apache.cassandra.stress.generate.DistributionFactory;
import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.SeedManager;
import org.apache.cassandra.stress.settings.Command;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.ThriftClient;
import org.apache.cassandra.stress.util.Timer;
-import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.CounterColumn;
+import org.apache.cassandra.thrift.Mutation;
public class ThriftCounterAdder extends PredefinedOperation
{
final Distribution counteradd;
- public ThriftCounterAdder(DistributionFactory counteradd, Timer timer, PartitionGenerator generator, StressSettings settings)
+ public ThriftCounterAdder(DistributionFactory counteradd, Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings)
{
- super(Command.COUNTER_WRITE, timer, generator, settings);
+ super(Command.COUNTER_WRITE, timer, generator, seedManager, settings);
this.counteradd = counteradd.get();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java
index 10c6aab..ca81fe9 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.SeedManager;
import org.apache.cassandra.stress.settings.Command;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.ThriftClient;
@@ -31,9 +32,9 @@ import org.apache.cassandra.thrift.SlicePredicate;
public class ThriftCounterGetter extends PredefinedOperation
{
- public ThriftCounterGetter(Timer timer, PartitionGenerator generator, StressSettings settings)
+ public ThriftCounterGetter(Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings)
{
- super(Command.COUNTER_READ, timer, generator, settings);
+ super(Command.COUNTER_READ, timer, generator, seedManager, settings);
}
public void run(final ThriftClient client) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java
index d6adbf9..1827c06 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java
@@ -24,22 +24,23 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.apache.cassandra.db.marshal.TimeUUIDType;
import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.SeedManager;
import org.apache.cassandra.stress.settings.Command;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.ThriftClient;
import org.apache.cassandra.stress.util.Timer;
-import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.UUIDGen;
public final class ThriftInserter extends PredefinedOperation
{
- public ThriftInserter(Timer timer, PartitionGenerator generator, StressSettings settings)
+ public ThriftInserter(Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings)
{
- super(Command.WRITE, timer, generator, settings);
+ super(Command.WRITE, timer, generator, seedManager, settings);
}
public boolean isWrite()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java
index 276d8c5..d77dc6a 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java
@@ -22,21 +22,20 @@ import java.nio.ByteBuffer;
import java.util.List;
import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.SeedManager;
import org.apache.cassandra.stress.settings.Command;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.ThriftClient;
import org.apache.cassandra.stress.util.Timer;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnParent;
-import org.apache.cassandra.thrift.SlicePredicate;
-import org.apache.cassandra.thrift.SuperColumn;
public final class ThriftReader extends PredefinedOperation
{
- public ThriftReader(Timer timer, PartitionGenerator generator, StressSettings settings)
+ public ThriftReader(Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings)
{
- super(Command.READ, timer, generator, settings);
+ super(Command.READ, timer, generator, seedManager, settings);
}
public void run(final ThriftClient client) throws IOException