You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2016/03/10 16:26:35 UTC
[7/8] cassandra git commit: Establish and implement canonical bulk
reading workload(s)
Establish and implement canonical bulk reading workload(s)
patch by Stefania Alborghetti; reviewed by Jake Luciani for CASSANDRA-10331
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f27ab290
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f27ab290
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f27ab290
Branch: refs/heads/cassandra-3.5
Commit: f27ab2908a06056b332e3a87008f0f8560a9620b
Parents: 9fd8dfc
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Wed Mar 2 17:40:20 2016 +0800
Committer: T Jake Luciani <ja...@apache.org>
Committed: Thu Mar 10 10:23:51 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
tools/cqlstress-example.yaml | 13 +
tools/stress/README.txt | 5 +
.../org/apache/cassandra/stress/Operation.java | 91 +------
.../apache/cassandra/stress/StressAction.java | 38 ++-
.../apache/cassandra/stress/StressMetrics.java | 2 +-
.../apache/cassandra/stress/StressProfile.java | 62 ++++-
.../org/apache/cassandra/stress/StressYaml.java | 8 +
.../apache/cassandra/stress/WorkManager.java | 2 +-
.../stress/generate/TokenRangeIterator.java | 70 +++++
.../operations/OpDistributionFactory.java | 2 +-
.../stress/operations/PartitionOperation.java | 130 +++++++++
.../SampledOpDistributionFactory.java | 16 +-
.../predefined/PredefinedOperation.java | 3 +-
.../operations/userdefined/SchemaStatement.java | 3 +-
.../operations/userdefined/TokenRangeQuery.java | 270 +++++++++++++++++++
.../userdefined/ValidatingSchemaQuery.java | 3 +-
.../cassandra/stress/settings/CliOption.java | 3 +-
.../settings/SettingsCommandPreDefined.java | 2 +-
.../SettingsCommandPreDefinedMixed.java | 2 +-
.../stress/settings/SettingsCommandUser.java | 14 +-
.../stress/settings/SettingsTokenRange.java | 77 ++++++
.../stress/settings/StressSettings.java | 8 +-
23 files changed, 701 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 58c7ed0..68f5714 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
3.5
Merged from 3.0:
+ * Establish and implement canonical bulk reading workload(s) (CASSANDRA-10331)
* Fix paging for IN queries on tables without clustering columns (CASSANDRA-11208)
* Remove recursive call from CompositesSearcher (CASSANDRA-11304)
* Fix filtering on non-primary key columns for queries without index (CASSANDRA-6377)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/cqlstress-example.yaml
----------------------------------------------------------------------
diff --git a/tools/cqlstress-example.yaml b/tools/cqlstress-example.yaml
index 3c60c32..835a4cb 100644
--- a/tools/cqlstress-example.yaml
+++ b/tools/cqlstress-example.yaml
@@ -93,3 +93,16 @@ queries:
range1:
cql: select * from typestest where name = ? and choice = ? and date >= ? LIMIT 100
fields: multirow # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition)
+
+
+#
+# A list of bulk read queries that analytics tools may perform against the schema
+# Each query will sweep an entire token range, page by page.
+#
+token_range_queries:
+ all_columns_tr_query:
+ columns: '*'
+ page_size: 5000
+
+ value_tr_query:
+ columns: value
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/README.txt
----------------------------------------------------------------------
diff --git a/tools/stress/README.txt b/tools/stress/README.txt
index 0046b25..aa89dab 100644
--- a/tools/stress/README.txt
+++ b/tools/stress/README.txt
@@ -72,6 +72,11 @@ Primary Options:
The port to connect to cassandra nodes on
-sendto:
Specify a stress server to send this command to
+ -graph:
+ Graph recorded metrics
+ -tokenrange:
+ Token range settings
+
Suboptions:
Every command and primary option has its own collection of suboptions. These are too numerous to list here.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/Operation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Operation.java b/tools/stress/src/org/apache/cassandra/stress/Operation.java
index 139dd53..0f13d3c 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Operation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Operation.java
@@ -19,13 +19,9 @@
package org.apache.cassandra.stress;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import com.google.common.util.concurrent.RateLimiter;
-import org.apache.cassandra.stress.generate.*;
-import org.apache.cassandra.stress.settings.OptionRatioDistribution;
import org.apache.cassandra.stress.settings.SettingsLog;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.JavaDriverClient;
@@ -38,45 +34,11 @@ public abstract class Operation
{
public final StressSettings settings;
public final Timer timer;
- protected final DataSpec spec;
- private final static RatioDistribution defaultRowPopulationRatio = OptionRatioDistribution.BUILDER.apply("fixed(1)/1").get();
- private final List<PartitionIterator> partitionCache = new ArrayList<>();
- protected List<PartitionIterator> partitions;
-
- public static final class DataSpec
- {
- public final PartitionGenerator partitionGenerator;
- final SeedManager seedManager;
- final Distribution partitionCount;
- final RatioDistribution useRatio;
- final RatioDistribution rowPopulationRatio;
- final Integer targetCount;
-
- public DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution rowPopulationRatio, Integer targetCount)
- {
- this(partitionGenerator, seedManager, partitionCount, null, rowPopulationRatio, targetCount);
- }
- public DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution useRatio, RatioDistribution rowPopulationRatio)
- {
- this(partitionGenerator, seedManager, partitionCount, useRatio, rowPopulationRatio, null);
- }
- private DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution useRatio, RatioDistribution rowPopulationRatio, Integer targetCount)
- {
- this.partitionGenerator = partitionGenerator;
- this.seedManager = seedManager;
- this.partitionCount = partitionCount;
- this.useRatio = useRatio;
- this.rowPopulationRatio = rowPopulationRatio == null ? defaultRowPopulationRatio : rowPopulationRatio;
- this.targetCount = targetCount;
- }
- }
-
- public Operation(Timer timer, StressSettings settings, DataSpec spec)
+ public Operation(Timer timer, StressSettings settings)
{
this.timer = timer;
this.settings = settings;
- this.spec = spec;
}
public static interface RunOp
@@ -86,48 +48,7 @@ public abstract class Operation
public int rowCount();
}
- boolean ready(WorkManager permits, RateLimiter rateLimiter)
- {
- int partitionCount = (int) spec.partitionCount.next();
- if (partitionCount <= 0)
- return false;
- partitionCount = permits.takePermits(partitionCount);
- if (partitionCount <= 0)
- return false;
-
- int i = 0;
- boolean success = true;
- for (; i < partitionCount && success ; i++)
- {
- if (i >= partitionCache.size())
- partitionCache.add(PartitionIterator.get(spec.partitionGenerator, spec.seedManager));
-
- success = false;
- while (!success)
- {
- Seed seed = spec.seedManager.next(this);
- if (seed == null)
- break;
-
- success = reset(seed, partitionCache.get(i));
- }
- }
- partitionCount = i;
-
- if (rateLimiter != null)
- rateLimiter.acquire(partitionCount);
-
- partitions = partitionCache.subList(0, partitionCount);
- return !partitions.isEmpty();
- }
-
- protected boolean reset(Seed seed, PartitionIterator iterator)
- {
- if (spec.useRatio == null)
- return iterator.reset(seed, spec.targetCount, spec.rowPopulationRatio.next(), isWrite());
- else
- return iterator.reset(seed, spec.useRatio.next(), spec.rowPopulationRatio.next(), isWrite());
- }
+ public abstract boolean ready(WorkManager permits, RateLimiter rateLimiter);
public boolean isWrite()
{
@@ -202,13 +123,7 @@ public abstract class Operation
}
- private String key()
- {
- List<String> keys = new ArrayList<>();
- for (PartitionIterator partition : partitions)
- keys.add(partition.getKeyAsString());
- return keys.toString();
- }
+ public abstract String key();
protected String getExceptionMessage(Exception e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/StressAction.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
index 3150e14..ebe6270 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
@@ -71,7 +71,7 @@ public class StressAction implements Runnable
success = runMulti(settings.rate.auto, rateLimiter);
else
success = null != run(settings.command.getFactory(settings), settings.rate.threadCount, settings.command.count,
- settings.command.duration, rateLimiter, settings.command.durationUnits, output);
+ settings.command.duration, rateLimiter, settings.command.durationUnits, output, false);
if (success)
output.println("END");
@@ -84,12 +84,12 @@ public class StressAction implements Runnable
// type provided separately to support recursive call for mixed command with each command type it is performing
private void warmup(OpDistributionFactory operations)
{
- // warmup - do 50k iterations; by default hotspot compiles methods after 10k invocations
PrintStream warmupOutput = new PrintStream(new OutputStream() { @Override public void write(int b) throws IOException { } } );
- int iterations = 50000 * settings.node.nodes.size();
+ // do 25% of iterations as warmup but no more than 50k (by default hotspot compiles methods after 10k invocations)
+ int iterations = (settings.command.count > 0
+ ? Math.min(50000, (int)(settings.command.count * 0.25))
+ : 50000) * settings.node.nodes.size();
int threads = 100;
- if (iterations > settings.command.count && settings.command.count > 0)
- return;
if (settings.rate.maxThreads > 0)
threads = Math.min(threads, settings.rate.maxThreads);
@@ -101,7 +101,7 @@ public class StressAction implements Runnable
// we need to warm up all the nodes in the cluster ideally, but we may not be the only stress instance;
// so warm up all the nodes we're speaking to only.
output.println(String.format("Warming up %s with %d iterations...", single.desc(), iterations));
- run(single, threads, iterations, 0, null, null, warmupOutput);
+ run(single, threads, iterations, 0, null, null, warmupOutput, true);
}
}
@@ -124,7 +124,7 @@ public class StressAction implements Runnable
settings.command.truncateTables(settings);
StressMetrics result = run(settings.command.getFactory(settings), threadCount, settings.command.count,
- settings.command.duration, rateLimiter, settings.command.durationUnits, output);
+ settings.command.duration, rateLimiter, settings.command.durationUnits, output, false);
if (result == null)
return false;
results.add(result);
@@ -181,7 +181,14 @@ public class StressAction implements Runnable
return improvement / count;
}
- private StressMetrics run(OpDistributionFactory operations, int threadCount, long opCount, long duration, RateLimiter rateLimiter, TimeUnit durationUnits, PrintStream output)
+ private StressMetrics run(OpDistributionFactory operations,
+ int threadCount,
+ long opCount,
+ long duration,
+ RateLimiter rateLimiter,
+ TimeUnit durationUnits,
+ PrintStream output,
+ boolean isWarmup)
{
output.println(String.format("Running %s with %d threads %s",
operations.desc(),
@@ -199,10 +206,12 @@ public class StressAction implements Runnable
final CountDownLatch done = new CountDownLatch(threadCount);
final Consumer[] consumers = new Consumer[threadCount];
+ int sampleCount = settings.samples.liveCount / threadCount;
for (int i = 0; i < threadCount; i++)
{
- consumers[i] = new Consumer(operations, done, workManager, metrics, rateLimiter,
- settings.samples.liveCount / threadCount);
+
+ consumers[i] = new Consumer(operations.get(metrics.getTiming(), sampleCount, isWarmup),
+ done, workManager, metrics, rateLimiter);
}
// starting worker threadCount
@@ -259,14 +268,17 @@ public class StressAction implements Runnable
private final WorkManager workManager;
private final CountDownLatch done;
- public Consumer(OpDistributionFactory operations, CountDownLatch done, WorkManager workManager, StressMetrics metrics,
- RateLimiter rateLimiter, int sampleCount)
+ public Consumer(OpDistribution operations,
+ CountDownLatch done,
+ WorkManager workManager,
+ StressMetrics metrics,
+ RateLimiter rateLimiter)
{
this.done = done;
this.rateLimiter = rateLimiter;
this.workManager = workManager;
this.metrics = metrics;
- this.operations = operations.get(metrics.getTiming(), sampleCount);
+ this.operations = operations;
}
public void run()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
index 0190ed8..3585a00 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
@@ -158,7 +158,7 @@ public class StressMetrics
TimingInterval current = result.intervals.combine(settings.samples.reportCount);
TimingInterval history = timing.getHistory().combine(settings.samples.historyCount);
rowRateUncertainty.update(current.adjustedRowRate());
- if (current.partitionCount != 0)
+ if (current.operationCount != 0)
{
if (result.intervals.intervals().size() > 1)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
index 297a004..5243d96 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.exceptions.RequestValidationException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.stress.generate.*;
import org.apache.cassandra.stress.generate.values.*;
+import org.apache.cassandra.stress.operations.userdefined.TokenRangeQuery;
import org.apache.cassandra.stress.operations.userdefined.SchemaInsert;
import org.apache.cassandra.stress.operations.userdefined.SchemaQuery;
import org.apache.cassandra.stress.operations.userdefined.ValidatingSchemaQuery;
@@ -66,9 +67,11 @@ public class StressProfile implements Serializable
public String tableName;
private Map<String, GeneratorConfig> columnConfigs;
private Map<String, StressYaml.QueryDef> queries;
+ public Map<String, StressYaml.TokenRangeQueryDef> tokenRangeQueries;
private Map<String, String> insert;
transient volatile TableMetadata tableMetaData;
+ transient volatile Set<TokenRange> tokenRanges;
transient volatile GeneratorFactory generatorFactory;
@@ -92,6 +95,7 @@ public class StressProfile implements Serializable
tableCql = yaml.table_definition;
seedStr = "seed for stress";
queries = yaml.queries;
+ tokenRangeQueries = yaml.token_range_queries;
insert = yaml.insert;
extraSchemaDefinitions = yaml.extra_definitions;
@@ -100,6 +104,10 @@ public class StressProfile implements Serializable
assert tableName != null : "table name is required in yaml file";
assert queries != null : "queries map is required in yaml file";
+ for (String query : queries.keySet())
+ assert !tokenRangeQueries.containsKey(query) :
+ String.format("Found %s in both queries and token_range_queries, please use different names", query);
+
if (keyspaceCql != null && keyspaceCql.length() > 0)
{
try
@@ -254,8 +262,48 @@ public class StressProfile implements Serializable
}
}
- public SchemaQuery getQuery(String name, Timer timer, PartitionGenerator generator, SeedManager seeds, StressSettings settings)
+ public Set<TokenRange> maybeLoadTokenRanges(StressSettings settings)
{
+ maybeLoadSchemaInfo(settings); // ensure table metadata is available
+
+ JavaDriverClient client = settings.getJavaDriverClient();
+ synchronized (client)
+ {
+ if (tokenRanges != null)
+ return tokenRanges;
+
+ Cluster cluster = client.getCluster();
+ Metadata metadata = cluster.getMetadata();
+ if (metadata == null)
+ throw new RuntimeException("Unable to get metadata");
+
+ List<TokenRange> sortedRanges = new ArrayList<>(metadata.getTokenRanges().size() + 1);
+ for (TokenRange range : metadata.getTokenRanges())
+ {
+ //if we don't unwrap we miss the partitions between ring min and smallest range start value
+ if (range.isWrappedAround())
+ sortedRanges.addAll(range.unwrap());
+ else
+ sortedRanges.add(range);
+ }
+
+ Collections.sort(sortedRanges);
+ tokenRanges = new LinkedHashSet<>(sortedRanges);
+ return tokenRanges;
+ }
+ }
+
+ public Operation getQuery(String name,
+ Timer timer,
+ PartitionGenerator generator,
+ SeedManager seeds,
+ StressSettings settings,
+ boolean isWarmup)
+ {
+ name = name.toLowerCase();
+ if (!queries.containsKey(name))
+ throw new IllegalArgumentException("No query defined with name " + name);
+
if (queryStatements == null)
{
synchronized (this)
@@ -296,13 +344,19 @@ public class StressProfile implements Serializable
}
}
- name = name.toLowerCase();
- if (!queryStatements.containsKey(name))
- throw new IllegalArgumentException("No query defined with name " + name);
return new SchemaQuery(timer, settings, generator, seeds, thriftQueryIds.get(name), queryStatements.get(name),
ThriftConversion.fromThrift(settings.command.consistencyLevel), argSelects.get(name));
}
+ public Operation getBulkReadQueries(String name, Timer timer, StressSettings settings, TokenRangeIterator tokenRangeIterator, boolean isWarmup)
+ {
+ StressYaml.TokenRangeQueryDef def = tokenRangeQueries.get(name);
+ if (def == null)
+ throw new IllegalArgumentException("No bulk read query defined with name " + name);
+
+ return new TokenRangeQuery(timer, settings, tableMetaData, tokenRangeIterator, def, isWarmup);
+ }
+
public SchemaInsert getInsert(Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings)
{
if (insertStatement == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/StressYaml.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressYaml.java b/tools/stress/src/org/apache/cassandra/stress/StressYaml.java
index 90797b4..214e56a 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressYaml.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressYaml.java
@@ -20,6 +20,7 @@
*/
package org.apache.cassandra.stress;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -35,6 +36,7 @@ public class StressYaml
public List<Map<String, Object>> columnspec;
public Map<String, QueryDef> queries;
public Map<String, String> insert;
+ public Map<String, TokenRangeQueryDef> token_range_queries = new HashMap<>();
public static class QueryDef
{
@@ -42,4 +44,10 @@ public class StressYaml
public String fields;
}
+ public static class TokenRangeQueryDef
+ {
+ public String columns;
+ public int page_size = 5000;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/WorkManager.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/WorkManager.java b/tools/stress/src/org/apache/cassandra/stress/WorkManager.java
index c6a3eee..78d4176 100644
--- a/tools/stress/src/org/apache/cassandra/stress/WorkManager.java
+++ b/tools/stress/src/org/apache/cassandra/stress/WorkManager.java
@@ -2,7 +2,7 @@ package org.apache.cassandra.stress;
import java.util.concurrent.atomic.AtomicLong;
-interface WorkManager
+public interface WorkManager
{
// -1 indicates consumer should terminate
int takePermits(int count);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/generate/TokenRangeIterator.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/TokenRangeIterator.java b/tools/stress/src/org/apache/cassandra/stress/generate/TokenRangeIterator.java
new file mode 100644
index 0000000..5ddac61
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/TokenRangeIterator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.stress.generate;
+
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import com.datastax.driver.core.TokenRange;
+import org.apache.cassandra.stress.settings.StressSettings;
+
+public class TokenRangeIterator
+{
+ private final Set<TokenRange> tokenRanges;
+ private final ConcurrentLinkedQueue<TokenRange> pendingRanges;
+ private final boolean wrap;
+
+ public TokenRangeIterator(StressSettings settings, Set<TokenRange> tokenRanges)
+ {
+ this.tokenRanges = maybeSplitRanges(tokenRanges, settings.tokenRange.splitFactor);
+ this.pendingRanges = new ConcurrentLinkedQueue<>(this.tokenRanges);
+ this.wrap = settings.tokenRange.wrap;
+ }
+
+ private static Set<TokenRange> maybeSplitRanges(Set<TokenRange> tokenRanges, int splitFactor)
+ {
+ if (splitFactor <= 1)
+ return tokenRanges;
+
+ Set<TokenRange> ret = new TreeSet<>();
+ for (TokenRange range : tokenRanges)
+ ret.addAll(range.splitEvenly(splitFactor));
+
+ return ret;
+ }
+
+ public void update()
+ {
+ // we may race and add to the queue twice but no bad consequence so it's fine if that happens
+ // as ultimately only the permits determine when to stop if wrap is true
+ if (wrap && pendingRanges.isEmpty())
+ pendingRanges.addAll(tokenRanges);
+ }
+
+ public TokenRange next()
+ {
+ return pendingRanges.poll();
+ }
+
+ public boolean exhausted()
+ {
+ return pendingRanges.isEmpty();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
index 7e13fcd..5fbb0f9 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
@@ -25,7 +25,7 @@ import org.apache.cassandra.stress.util.Timing;
public interface OpDistributionFactory
{
- public OpDistribution get(Timing timing, int sampleCount);
+ public OpDistribution get(Timing timing, int sampleCount, boolean isWarmup);
public String desc();
Iterable<OpDistributionFactory> each();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java
new file mode 100644
index 0000000..45c36f2
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.stress.operations;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.WorkManager;
+import org.apache.cassandra.stress.generate.Distribution;
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.PartitionIterator;
+import org.apache.cassandra.stress.generate.RatioDistribution;
+import org.apache.cassandra.stress.generate.Seed;
+import org.apache.cassandra.stress.generate.SeedManager;
+import org.apache.cassandra.stress.settings.OptionRatioDistribution;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.Timer;
+
+public abstract class PartitionOperation extends Operation
+{
+ protected final DataSpec spec;
+ private final static RatioDistribution defaultRowPopulationRatio = OptionRatioDistribution.BUILDER.apply("fixed(1)/1").get();
+
+ private final List<PartitionIterator> partitionCache = new ArrayList<>();
+ protected List<PartitionIterator> partitions;
+
+ public static final class DataSpec
+ {
+ public final PartitionGenerator partitionGenerator;
+ final SeedManager seedManager;
+ final Distribution partitionCount;
+ final RatioDistribution useRatio;
+ final RatioDistribution rowPopulationRatio;
+ final Integer targetCount;
+
+ public DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution rowPopulationRatio, Integer targetCount)
+ {
+ this(partitionGenerator, seedManager, partitionCount, null, rowPopulationRatio, targetCount);
+ }
+ public DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution useRatio, RatioDistribution rowPopulationRatio)
+ {
+ this(partitionGenerator, seedManager, partitionCount, useRatio, rowPopulationRatio, null);
+ }
+ private DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution useRatio, RatioDistribution rowPopulationRatio, Integer targetCount)
+ {
+ this.partitionGenerator = partitionGenerator;
+ this.seedManager = seedManager;
+ this.partitionCount = partitionCount;
+ this.useRatio = useRatio;
+ this.rowPopulationRatio = rowPopulationRatio == null ? defaultRowPopulationRatio : rowPopulationRatio;
+ this.targetCount = targetCount;
+ }
+ }
+
+ public PartitionOperation(Timer timer, StressSettings settings, DataSpec spec)
+ {
+ super(timer, settings);
+ this.spec = spec;
+ }
+
+ public boolean ready(WorkManager permits, RateLimiter rateLimiter)
+ {
+ int partitionCount = (int) spec.partitionCount.next();
+ if (partitionCount <= 0)
+ return false;
+ partitionCount = permits.takePermits(partitionCount);
+ if (partitionCount <= 0)
+ return false;
+
+ int i = 0;
+ boolean success = true;
+ for (; i < partitionCount && success ; i++)
+ {
+ if (i >= partitionCache.size())
+ partitionCache.add(PartitionIterator.get(spec.partitionGenerator, spec.seedManager));
+
+ success = false;
+ while (!success)
+ {
+ Seed seed = spec.seedManager.next(this);
+ if (seed == null)
+ break;
+
+ success = reset(seed, partitionCache.get(i));
+ }
+ }
+ partitionCount = i;
+
+ if (rateLimiter != null)
+ rateLimiter.acquire(partitionCount);
+
+ partitions = partitionCache.subList(0, partitionCount);
+ return !partitions.isEmpty();
+ }
+
+ protected boolean reset(Seed seed, PartitionIterator iterator)
+ {
+ if (spec.useRatio == null)
+ return iterator.reset(seed, spec.targetCount, spec.rowPopulationRatio.next(), isWrite());
+ else
+ return iterator.reset(seed, spec.useRatio.next(), spec.rowPopulationRatio.next(), isWrite());
+ }
+
+ public String key()
+ {
+ List<String> keys = new ArrayList<>();
+ for (PartitionIterator partition : partitions)
+ keys.add(partition.getKeyAsString());
+ return keys.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
index 194f84f..a10585d 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
@@ -44,16 +44,17 @@ public abstract class SampledOpDistributionFactory<T> implements OpDistributionF
this.clustering = clustering;
}
- protected abstract List<? extends Operation> get(Timer timer, PartitionGenerator generator, T key);
+ protected abstract List<? extends Operation> get(Timer timer, PartitionGenerator generator, T key, boolean isWarmup);
protected abstract PartitionGenerator newGenerator();
- public OpDistribution get(Timing timing, int sampleCount)
+ public OpDistribution get(Timing timing, int sampleCount, boolean isWarmup)
{
PartitionGenerator generator = newGenerator();
List<Pair<Operation, Double>> operations = new ArrayList<>();
for (Map.Entry<T, Double> ratio : ratios.entrySet())
{
- List<? extends Operation> ops = get(timing.newTimer(ratio.getKey().toString(), sampleCount), generator, ratio.getKey());
+ List<? extends Operation> ops = get(timing.newTimer(ratio.getKey().toString(), sampleCount),
+ generator, ratio.getKey(), isWarmup);
for (Operation op : ops)
operations.add(new Pair<>(op, ratio.getValue() / ops.size()));
}
@@ -75,15 +76,18 @@ public abstract class SampledOpDistributionFactory<T> implements OpDistributionF
{
out.add(new OpDistributionFactory()
{
- public OpDistribution get(Timing timing, int sampleCount)
+ public OpDistribution get(Timing timing, int sampleCount, boolean isWarmup)
{
- List<? extends Operation> ops = SampledOpDistributionFactory.this.get(timing.newTimer(ratio.getKey().toString(), sampleCount), newGenerator(), ratio.getKey());
+ List<? extends Operation> ops = SampledOpDistributionFactory.this.get(timing.newTimer(ratio.getKey().toString(), sampleCount),
+ newGenerator(),
+ ratio.getKey(),
+ isWarmup);
if (ops.size() == 1)
return new FixedOpDistribution(ops.get(0));
List<Pair<Operation, Double>> ratios = new ArrayList<>();
for (Operation op : ops)
ratios.add(new Pair<>(op, 1d / ops.size()));
- return new SampledOpDistribution(new EnumeratedDistribution<Operation>(ratios), new DistributionFixed(1));
+ return new SampledOpDistribution(new EnumeratedDistribution<>(ratios), new DistributionFixed(1));
}
public String desc()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
index b435abb..1f9a2c8 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ThreadLocalRandom;
import org.apache.cassandra.stress.Operation;
import org.apache.cassandra.stress.generate.*;
+import org.apache.cassandra.stress.operations.PartitionOperation;
import org.apache.cassandra.stress.settings.Command;
import org.apache.cassandra.stress.settings.CqlVersion;
import org.apache.cassandra.stress.settings.StressSettings;
@@ -32,7 +33,7 @@ import org.apache.cassandra.stress.util.Timer;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
-public abstract class PredefinedOperation extends Operation
+public abstract class PredefinedOperation extends PartitionOperation
{
public static final byte[] EMPTY_BYTE_ARRAY = {};
public final Command type;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
index 1c88490..166d689 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
@@ -32,11 +32,12 @@ import com.datastax.driver.core.PreparedStatement;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.stress.Operation;
import org.apache.cassandra.stress.generate.Row;
+import org.apache.cassandra.stress.operations.PartitionOperation;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.JavaDriverClient;
import org.apache.cassandra.stress.util.Timer;
-public abstract class SchemaStatement extends Operation
+public abstract class SchemaStatement extends PartitionOperation
{
final PreparedStatement statement;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
new file mode 100644
index 0000000..60a6c48
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.stress.operations.userdefined;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import javax.naming.OperationNotSupportedException;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.PagingState;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.Token;
+import com.datastax.driver.core.TokenRange;
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.StressYaml;
+import org.apache.cassandra.stress.WorkManager;
+import org.apache.cassandra.stress.generate.TokenRangeIterator;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.JavaDriverClient;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.stress.util.Timer;
+
+public class TokenRangeQuery extends Operation
+{
+ private final ThreadLocal<State> currentState = new ThreadLocal<>();
+
+ private final TableMetadata tableMetadata;
+ private final TokenRangeIterator tokenRangeIterator;
+ private final String columns;
+ private final int pageSize;
+ private final boolean isWarmup;
+
+ public TokenRangeQuery(Timer timer,
+ StressSettings settings,
+ TableMetadata tableMetadata,
+ TokenRangeIterator tokenRangeIterator,
+ StressYaml.TokenRangeQueryDef def,
+ boolean isWarmup)
+ {
+ super(timer, settings);
+ this.tableMetadata = tableMetadata;
+ this.tokenRangeIterator = tokenRangeIterator;
+ this.columns = sanitizeColumns(def.columns, tableMetadata);
+ this.pageSize = isWarmup ? Math.min(100, def.page_size) : def.page_size;
+ this.isWarmup = isWarmup;
+ }
+
+ /**
+ * We need to specify the columns by name because we need to add token(partition_keys) in order to count
+ * partitions. So if the user specifies '*' then replace it with a list of all columns.
+ */
+ private static String sanitizeColumns(String columns, TableMetadata tableMetadata)
+ {
+ if (!columns.equals("*"))
+ return columns;
+
+ return String.join(", ", tableMetadata.getColumns().stream().map(ColumnMetadata::getName).collect(Collectors.toList()));
+ }
+
+ /**
+ * The state of a token range currently being retrieved.
+ * Here we store the paging state to retrieve more pages
+ * and we keep track of which partitions have already been retrieved,
+ */
+ private final static class State
+ {
+ public final TokenRange tokenRange;
+ public final String query;
+ public PagingState pagingState;
+ public Set<Token> partitions = new HashSet<>();
+
+ public State(TokenRange tokenRange, String query)
+ {
+ this.tokenRange = tokenRange;
+ this.query = query;
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("[%s, %s]", tokenRange.getStart(), tokenRange.getEnd());
+ }
+ }
+
+ abstract static class Runner implements RunOp
+ {
+ int partitionCount;
+ int rowCount;
+
+ @Override
+ public int partitionCount()
+ {
+ return partitionCount;
+ }
+
+ @Override
+ public int rowCount()
+ {
+ return rowCount;
+ }
+ }
+
+ private class JavaDriverRun extends Runner
+ {
+ final JavaDriverClient client;
+
+ private JavaDriverRun(JavaDriverClient client)
+ {
+ this.client = client;
+ }
+
+ public boolean run() throws Exception
+ {
+ State state = currentState.get();
+ if (state == null)
+ { // start processing a new token range
+ TokenRange range = tokenRangeIterator.next();
+ if (range == null)
+ return true; // no more token ranges to process
+
+ state = new State(range, buildQuery(range));
+ currentState.set(state);
+ }
+
+ ResultSet results;
+ Statement statement = new SimpleStatement(state.query);
+ statement.setFetchSize(pageSize);
+
+ if (state.pagingState != null)
+ statement.setPagingState(state.pagingState);
+
+ results = client.getSession().execute(statement);
+ state.pagingState = results.getExecutionInfo().getPagingState();
+
+ int remaining = results.getAvailableWithoutFetching();
+ rowCount += remaining;
+
+ for (Row row : results)
+ {
+ // this call will only succeed if we've added token(partition keys) to the query
+ Token partition = row.getPartitionKeyToken();
+ if (!state.partitions.contains(partition))
+ {
+ partitionCount += 1;
+ state.partitions.add(partition);
+ }
+
+ if (--remaining == 0)
+ break;
+ }
+
+ if (results.isExhausted() || isWarmup)
+ { // no more pages to fetch or just warming up, ready to move on to another token range
+ currentState.set(null);
+ }
+
+ return true;
+ }
+ }
+
+ private String buildQuery(TokenRange tokenRange)
+ {
+ Token start = tokenRange.getStart();
+ Token end = tokenRange.getEnd();
+ List<String> pkColumns = tableMetadata.getPartitionKey().stream().map(ColumnMetadata::getName).collect(Collectors.toList());
+ String tokenStatement = String.format("token(%s)", String.join(", ", pkColumns));
+
+ StringBuilder ret = new StringBuilder();
+ ret.append("SELECT ");
+ ret.append(tokenStatement); // add the token(pk) statement so that we can count partitions
+ ret.append(", ");
+ ret.append(columns);
+ ret.append(" FROM ");
+ ret.append(tableMetadata.getName());
+ if (start != null || end != null)
+ ret.append(" WHERE ");
+ if (start != null)
+ {
+ ret.append(tokenStatement);
+ ret.append(" > ");
+ ret.append(start.toString());
+ }
+
+ if (start != null && end != null)
+ ret.append(" AND ");
+
+ if (end != null)
+ {
+ ret.append(tokenStatement);
+ ret.append(" <= ");
+ ret.append(end.toString());
+ }
+
+ return ret.toString();
+ }
+
+ private static class ThriftRun extends Runner
+ {
+ final ThriftClient client;
+
+ private ThriftRun(ThriftClient client)
+ {
+ this.client = client;
+ }
+
+ public boolean run() throws Exception
+ {
+ throw new OperationNotSupportedException("Bulk read over thrift not supported");
+ }
+ }
+
+
+ @Override
+ public void run(JavaDriverClient client) throws IOException
+ {
+ timeWithRetry(new JavaDriverRun(client));
+ }
+
+ @Override
+ public void run(ThriftClient client) throws IOException
+ {
+ timeWithRetry(new ThriftRun(client));
+ }
+
+ public boolean ready(WorkManager workManager, RateLimiter rateLimiter)
+ {
+ tokenRangeIterator.update();
+
+ if (tokenRangeIterator.exhausted() && currentState.get() == null)
+ return false;
+
+ int numLeft = workManager.takePermits(1);
+ if (rateLimiter != null && numLeft > 0 )
+ rateLimiter.acquire(numLeft);
+
+ return numLeft > 0;
+ }
+
+ public String key()
+ {
+ State state = currentState.get();
+ return state == null ? "-" : state.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
index c07328a..33f6f80 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.stress.Operation;
import org.apache.cassandra.stress.generate.*;
import org.apache.cassandra.stress.generate.Row;
+import org.apache.cassandra.stress.operations.PartitionOperation;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.JavaDriverClient;
import org.apache.cassandra.stress.util.ThriftClient;
@@ -44,7 +45,7 @@ import org.apache.cassandra.thrift.ThriftConversion;
import org.apache.cassandra.utils.Pair;
import org.apache.thrift.TException;
-public class ValidatingSchemaQuery extends Operation
+public class ValidatingSchemaQuery extends PartitionOperation
{
private Pair<Row, Row> bounds;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/settings/CliOption.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/CliOption.java b/tools/stress/src/org/apache/cassandra/stress/settings/CliOption.java
index 30933d9..6d8e184 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/CliOption.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/CliOption.java
@@ -39,7 +39,8 @@ public enum CliOption
TRANSPORT("Custom transport factories", SettingsTransport.helpPrinter()),
PORT("The port to connect to cassandra nodes on", SettingsPort.helpPrinter()),
SENDTO("-send-to", "Specify a stress server to send this command to", SettingsMisc.sendToDaemonHelpPrinter()),
- GRAPH("-graph", "Graph recorded metrics", SettingsGraph.helpPrinter())
+ GRAPH("-graph", "Graph recorded metrics", SettingsGraph.helpPrinter()),
+ TOKENRANGE("Token range settings", SettingsTokenRange.helpPrinter())
;
private static final Map<String, CliOption> LOOKUP;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java
index 83f444c..c2f2591 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java
@@ -51,7 +51,7 @@ public class SettingsCommandPreDefined extends SettingsCommand
final SeedManager seeds = new SeedManager(settings);
return new OpDistributionFactory()
{
- public OpDistribution get(Timing timing, int sampleCount)
+ public OpDistribution get(Timing timing, int sampleCount, boolean isWarmup)
{
return new FixedOpDistribution(PredefinedOperation.operation(type, timing.newTimer(type.toString(), sampleCount),
newGenerator(settings), seeds, settings, add));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java
index 861b1a4..dd11452 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java
@@ -55,7 +55,7 @@ public class SettingsCommandPreDefinedMixed extends SettingsCommandPreDefined
final SeedManager seeds = new SeedManager(settings);
return new SampledOpDistributionFactory<Command>(ratios, clustering)
{
- protected List<? extends Operation> get(Timer timer, PartitionGenerator generator, Command key)
+ protected List<? extends Operation> get(Timer timer, PartitionGenerator generator, Command key, boolean isWarmup)
{
return Collections.singletonList(PredefinedOperation.operation(key, timer, generator, seeds, settings, add));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
index 8440e8e..36cbefe 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.stress.StressProfile;
import org.apache.cassandra.stress.generate.DistributionFactory;
import org.apache.cassandra.stress.generate.PartitionGenerator;
import org.apache.cassandra.stress.generate.SeedManager;
+import org.apache.cassandra.stress.generate.TokenRangeIterator;
import org.apache.cassandra.stress.operations.OpDistributionFactory;
import org.apache.cassandra.stress.operations.SampledOpDistributionFactory;
import org.apache.cassandra.stress.util.Timer;
@@ -69,15 +70,24 @@ public class SettingsCommandUser extends SettingsCommand
public OpDistributionFactory getFactory(final StressSettings settings)
{
final SeedManager seeds = new SeedManager(settings);
+ final TokenRangeIterator tokenRangeIterator = profile.tokenRangeQueries.isEmpty()
+ ? null
+ : new TokenRangeIterator(settings,
+ profile.maybeLoadTokenRanges(settings));
+
return new SampledOpDistributionFactory<String>(ratios, clustering)
{
- protected List<? extends Operation> get(Timer timer, PartitionGenerator generator, String key)
+ protected List<? extends Operation> get(Timer timer, PartitionGenerator generator, String key, boolean isWarmup)
{
if (key.equalsIgnoreCase("insert"))
return Collections.singletonList(profile.getInsert(timer, generator, seeds, settings));
if (key.equalsIgnoreCase("validate"))
return profile.getValidate(timer, generator, seeds, settings);
- return Collections.singletonList(profile.getQuery(key, timer, generator, seeds, settings));
+
+ if (profile.tokenRangeQueries.containsKey(key))
+ return Collections.singletonList(profile.getBulkReadQueries(key, timer, settings, tokenRangeIterator, isWarmup));
+
+ return Collections.singletonList(profile.getQuery(key, timer, generator, seeds, settings, isWarmup));
}
protected PartitionGenerator newGenerator()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTokenRange.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTokenRange.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTokenRange.java
new file mode 100644
index 0000000..8fb0048
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTokenRange.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.stress.settings;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.Ints;
+
+public class SettingsTokenRange
+{
+ public final boolean wrap;
+ public final int splitFactor;
+
+ public SettingsTokenRange(TokenRangeOptions options)
+ {
+ this.wrap = options.wrap.setByUser();
+ this.splitFactor = Ints.checkedCast(OptionDistribution.parseLong(options.splitFactor.value()));
+ }
+
+ private static final class TokenRangeOptions extends GroupedOptions
+ {
+ final OptionSimple wrap = new OptionSimple("wrap", "", null, "Re-use token ranges in order to terminate stress iterations", false);
+ final OptionSimple splitFactor = new OptionSimple("split-factor=", "[0-9]+[bmk]?", "1", "Split every token range by this factor", false);
+
+
+ @Override
+ public List<? extends Option> options()
+ {
+ return ImmutableList.<Option>builder().add(wrap, splitFactor).build();
+ }
+ }
+
+ public static SettingsTokenRange get(Map<String, String[]> clArgs)
+ {
+ String[] params = clArgs.remove("-tokenrange");
+ if (params == null)
+ {
+ return new SettingsTokenRange(new TokenRangeOptions());
+ }
+ TokenRangeOptions options = GroupedOptions.select(params, new TokenRangeOptions());
+ if (options == null)
+ {
+ printHelp();
+ System.out.println("Invalid -tokenrange options provided, see output for valid options");
+ System.exit(1);
+ }
+ return new SettingsTokenRange(options);
+ }
+
+ public static void printHelp()
+ {
+ GroupedOptions.printOptions(System.out, "-tokenrange", new TokenRangeOptions());
+ }
+
+ public static Runnable helpPrinter()
+ {
+ return SettingsTokenRange::printHelp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
index 26f65b5..6625bc8 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
@@ -55,6 +55,7 @@ public class StressSettings implements Serializable
public final SettingsPort port;
public final String sendToDaemon;
public final SettingsGraph graph;
+ public final SettingsTokenRange tokenRange;
public StressSettings(SettingsCommand command,
SettingsRate rate,
@@ -70,7 +71,8 @@ public class StressSettings implements Serializable
SettingsTransport transport,
SettingsPort port,
String sendToDaemon,
- SettingsGraph graph)
+ SettingsGraph graph,
+ SettingsTokenRange tokenRange)
{
this.command = command;
this.rate = rate;
@@ -87,6 +89,7 @@ public class StressSettings implements Serializable
this.port = port;
this.sendToDaemon = sendToDaemon;
this.graph = graph;
+ this.tokenRange = tokenRange;
}
private SmartThriftClient tclient;
@@ -269,6 +272,7 @@ public class StressSettings implements Serializable
SettingsPort port = SettingsPort.get(clArgs);
SettingsRate rate = SettingsRate.get(clArgs, command);
SettingsPopulation generate = SettingsPopulation.get(clArgs, command);
+ SettingsTokenRange tokenRange = SettingsTokenRange.get(clArgs);
SettingsInsert insert = SettingsInsert.get(clArgs);
SettingsColumn columns = SettingsColumn.get(clArgs);
SettingsSamples samples = SettingsSamples.get(clArgs);
@@ -296,7 +300,7 @@ public class StressSettings implements Serializable
System.exit(1);
}
- return new StressSettings(command, rate, generate, insert, columns, samples, errors, log, mode, node, schema, transport, port, sendToDaemon, graph);
+ return new StressSettings(command, rate, generate, insert, columns, samples, errors, log, mode, node, schema, transport, port, sendToDaemon, graph, tokenRange);
}
private static Map<String, String[]> parseMap(String[] args)