You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2016/03/10 16:26:30 UTC

[2/8] cassandra git commit: Establish and implement canonical bulk reading workload(s)

Establish and implement canonical bulk reading workload(s)

patch by Stefania Alborghetti; reviewed by Jake Luciani for CASSANDRA-10331


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/232e12b9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/232e12b9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/232e12b9

Branch: refs/heads/cassandra-3.5
Commit: 232e12b94c57f50c9ac91f630c9f29f03317f4cc
Parents: 60b23b1
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Wed Mar 2 17:40:20 2016 +0800
Committer: T Jake Luciani <ja...@apache.org>
Committed: Thu Mar 10 10:22:17 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 tools/cqlstress-example.yaml                    |  13 +
 tools/stress/README.txt                         |   3 +
 .../org/apache/cassandra/stress/Operation.java  |  92 +------
 .../apache/cassandra/stress/StressAction.java   |  38 ++-
 .../apache/cassandra/stress/StressMetrics.java  |   2 +-
 .../apache/cassandra/stress/StressProfile.java  |  62 ++++-
 .../org/apache/cassandra/stress/StressYaml.java |   8 +
 .../apache/cassandra/stress/WorkManager.java    |   2 +-
 .../stress/generate/TokenRangeIterator.java     |  70 +++++
 .../operations/OpDistributionFactory.java       |   2 +-
 .../stress/operations/PartitionOperation.java   | 130 +++++++++
 .../SampledOpDistributionFactory.java           |  16 +-
 .../predefined/PredefinedOperation.java         |   3 +-
 .../operations/userdefined/SchemaStatement.java |   3 +-
 .../operations/userdefined/TokenRangeQuery.java | 270 +++++++++++++++++++
 .../userdefined/ValidatingSchemaQuery.java      |   3 +-
 .../cassandra/stress/settings/CliOption.java    |   3 +-
 .../settings/SettingsCommandPreDefined.java     |   2 +-
 .../SettingsCommandPreDefinedMixed.java         |   2 +-
 .../stress/settings/SettingsCommandUser.java    |  14 +-
 .../stress/settings/SettingsTokenRange.java     |  77 ++++++
 .../stress/settings/StressSettings.java         |   7 +-
 23 files changed, 698 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/232e12b9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 864a33d..7b93083 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.5
+ * Establish and implement canonical bulk reading workload(s) (CASSANDRA-10331)
  * Fix paging for IN queries on tables without clustering columns (CASSANDRA-11208)
  * Remove recursive call from CompositesSearcher (CASSANDRA-11304)
  * Fix filtering on non-primary key columns for queries without index (CASSANDRA-6377)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/232e12b9/tools/cqlstress-example.yaml
----------------------------------------------------------------------
diff --git a/tools/cqlstress-example.yaml b/tools/cqlstress-example.yaml
index 3c60c32..835a4cb 100644
--- a/tools/cqlstress-example.yaml
+++ b/tools/cqlstress-example.yaml
@@ -93,3 +93,16 @@ queries:
    range1:
       cql: select * from typestest where name = ? and choice = ? and date >= ? LIMIT 100
       fields: multirow            # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition)
+
+
+#
+# A list of bulk read queries that analytics tools may perform against the schema
+# Each query will sweep an entire token range, page by page.
+#
+token_range_queries:
+  all_columns_tr_query:
+    columns: '*'
+    page_size: 5000
+
+  value_tr_query:
+    columns: value

http://git-wip-us.apache.org/repos/asf/cassandra/blob/232e12b9/tools/stress/README.txt
----------------------------------------------------------------------
diff --git a/tools/stress/README.txt b/tools/stress/README.txt
index 0046b25..e560c08 100644
--- a/tools/stress/README.txt
+++ b/tools/stress/README.txt
@@ -72,6 +72,9 @@ Primary Options:
         The port to connect to cassandra nodes on
     -sendto:
         Specify a stress server to send this command to
+    -tokenrange:
+        Token range settings
+
 
 Suboptions:
     Every command and primary option has its own collection of suboptions. These are too numerous to list here.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/232e12b9/tools/stress/src/org/apache/cassandra/stress/Operation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Operation.java b/tools/stress/src/org/apache/cassandra/stress/Operation.java
index 4123911..8054482 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Operation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Operation.java
@@ -18,14 +18,9 @@
 package org.apache.cassandra.stress;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 
 import com.google.common.util.concurrent.RateLimiter;
 
-import org.apache.cassandra.stress.generate.*;
-import org.apache.cassandra.stress.settings.OptionDistribution;
-import org.apache.cassandra.stress.settings.OptionRatioDistribution;
 import org.apache.cassandra.stress.settings.SettingsLog;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.JavaDriverClient;
@@ -38,45 +33,11 @@ public abstract class Operation
 {
     public final StressSettings settings;
     public final Timer timer;
-    protected final DataSpec spec;
-    private final static RatioDistribution defaultRowPopulationRatio = OptionRatioDistribution.BUILDER.apply("fixed(1)/1").get();
 
-    private final List<PartitionIterator> partitionCache = new ArrayList<>();
-    protected List<PartitionIterator> partitions;
-
-    public static final class DataSpec
-    {
-        public final PartitionGenerator partitionGenerator;
-        final SeedManager seedManager;
-        final Distribution partitionCount;
-        final RatioDistribution useRatio;
-        final RatioDistribution rowPopulationRatio;
-        final Integer targetCount;
-
-        public DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution rowPopulationRatio, Integer targetCount)
-        {
-            this(partitionGenerator, seedManager, partitionCount, null, rowPopulationRatio, targetCount);
-        }
-        public DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution useRatio, RatioDistribution rowPopulationRatio)
-        {
-            this(partitionGenerator, seedManager, partitionCount, useRatio, rowPopulationRatio, null);
-        }
-        private DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution useRatio, RatioDistribution rowPopulationRatio, Integer targetCount)
-        {
-            this.partitionGenerator = partitionGenerator;
-            this.seedManager = seedManager;
-            this.partitionCount = partitionCount;
-            this.useRatio = useRatio;
-            this.rowPopulationRatio = rowPopulationRatio == null ? defaultRowPopulationRatio : rowPopulationRatio;
-            this.targetCount = targetCount;
-        }
-    }
-
-    public Operation(Timer timer, StressSettings settings, DataSpec spec)
+    public Operation(Timer timer, StressSettings settings)
     {
         this.timer = timer;
         this.settings = settings;
-        this.spec = spec;
     }
 
     public static interface RunOp
@@ -86,48 +47,7 @@ public abstract class Operation
         public int rowCount();
     }
 
-    boolean ready(WorkManager permits, RateLimiter rateLimiter)
-    {
-        int partitionCount = (int) spec.partitionCount.next();
-        if (partitionCount <= 0)
-            return false;
-        partitionCount = permits.takePermits(partitionCount);
-        if (partitionCount <= 0)
-            return false;
-
-        int i = 0;
-        boolean success = true;
-        for (; i < partitionCount && success ; i++)
-        {
-            if (i >= partitionCache.size())
-                partitionCache.add(PartitionIterator.get(spec.partitionGenerator, spec.seedManager));
-
-            success = false;
-            while (!success)
-            {
-                Seed seed = spec.seedManager.next(this);
-                if (seed == null)
-                    break;
-
-                success = reset(seed, partitionCache.get(i));
-            }
-        }
-        partitionCount = i;
-
-        if (rateLimiter != null)
-            rateLimiter.acquire(partitionCount);
-
-        partitions = partitionCache.subList(0, partitionCount);
-        return !partitions.isEmpty();
-    }
-
-    protected boolean reset(Seed seed, PartitionIterator iterator)
-    {
-        if (spec.useRatio == null)
-            return iterator.reset(seed, spec.targetCount, spec.rowPopulationRatio.next(), isWrite());
-        else
-            return iterator.reset(seed, spec.useRatio.next(), spec.rowPopulationRatio.next(), isWrite());
-    }
+    public abstract boolean ready(WorkManager permits, RateLimiter rateLimiter);
 
     public boolean isWrite()
     {
@@ -200,13 +120,7 @@ public abstract class Operation
 
     }
 
-    private String key()
-    {
-        List<String> keys = new ArrayList<>();
-        for (PartitionIterator partition : partitions)
-            keys.add(partition.getKeyAsString());
-        return keys.toString();
-    }
+    public abstract String key();
 
     protected String getExceptionMessage(Exception e)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/232e12b9/tools/stress/src/org/apache/cassandra/stress/StressAction.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
index 158a278..cda54a0 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
@@ -72,7 +72,7 @@ public class StressAction implements Runnable
             success = runMulti(settings.rate.auto, rateLimiter);
         else
             success = null != run(settings.command.getFactory(settings), settings.rate.threadCount, settings.command.count,
-                                  settings.command.duration, rateLimiter, settings.command.durationUnits, output);
+                                  settings.command.duration, rateLimiter, settings.command.durationUnits, output, false);
 
         if (success)
             output.println("END");
@@ -85,12 +85,12 @@ public class StressAction implements Runnable
     // type provided separately to support recursive call for mixed command with each command type it is performing
     private void warmup(OpDistributionFactory operations)
     {
-        // warmup - do 50k iterations; by default hotspot compiles methods after 10k invocations
         PrintStream warmupOutput = new PrintStream(new OutputStream() { @Override public void write(int b) throws IOException { } } );
-        int iterations = 50000 * settings.node.nodes.size();
+        // do 25% of iterations as warmup but no more than 50k (by default hotspot compiles methods after 10k invocations)
+        int iterations = (settings.command.count > 0
+                         ? Math.min(50000, (int)(settings.command.count * 0.25))
+                         : 50000) * settings.node.nodes.size();
         int threads = 100;
-        if (iterations > settings.command.count && settings.command.count > 0)
-            return;
 
         if (settings.rate.maxThreads > 0)
             threads = Math.min(threads, settings.rate.maxThreads);
@@ -102,7 +102,7 @@ public class StressAction implements Runnable
             // we need to warm up all the nodes in the cluster ideally, but we may not be the only stress instance;
             // so warm up all the nodes we're speaking to only.
             output.println(String.format("Warming up %s with %d iterations...", single.desc(), iterations));
-            run(single, threads, iterations, 0, null, null, warmupOutput);
+            run(single, threads, iterations, 0, null, null, warmupOutput, true);
         }
     }
 
@@ -124,7 +124,7 @@ public class StressAction implements Runnable
                 settings.command.truncateTables(settings);
 
             StressMetrics result = run(settings.command.getFactory(settings), threadCount, settings.command.count,
-                                       settings.command.duration, rateLimiter, settings.command.durationUnits, output);
+                                       settings.command.duration, rateLimiter, settings.command.durationUnits, output, false);
             if (result == null)
                 return false;
             results.add(result);
@@ -181,7 +181,14 @@ public class StressAction implements Runnable
         return improvement / count;
     }
 
-    private StressMetrics run(OpDistributionFactory operations, int threadCount, long opCount, long duration, RateLimiter rateLimiter, TimeUnit durationUnits, PrintStream output)
+    private StressMetrics run(OpDistributionFactory operations,
+                              int threadCount,
+                              long opCount,
+                              long duration,
+                              RateLimiter rateLimiter,
+                              TimeUnit durationUnits,
+                              PrintStream output,
+                              boolean isWarmup)
     {
         output.println(String.format("Running %s with %d threads %s",
                                      operations.desc(),
@@ -199,10 +206,12 @@ public class StressAction implements Runnable
 
         final CountDownLatch done = new CountDownLatch(threadCount);
         final Consumer[] consumers = new Consumer[threadCount];
+        int sampleCount = settings.samples.liveCount / threadCount;
         for (int i = 0; i < threadCount; i++)
         {
-            consumers[i] = new Consumer(operations, done, workManager, metrics, rateLimiter,
-                                        settings.samples.liveCount / threadCount);
+
+            consumers[i] = new Consumer(operations.get(metrics.getTiming(), sampleCount, isWarmup),
+                                        done, workManager, metrics, rateLimiter);
         }
 
         // starting worker threadCount
@@ -259,14 +268,17 @@ public class StressAction implements Runnable
         private final WorkManager workManager;
         private final CountDownLatch done;
 
-        public Consumer(OpDistributionFactory operations, CountDownLatch done, WorkManager workManager, StressMetrics metrics,
-                        RateLimiter rateLimiter, int sampleCount)
+        public Consumer(OpDistribution operations,
+                        CountDownLatch done,
+                        WorkManager workManager,
+                        StressMetrics metrics,
+                        RateLimiter rateLimiter)
         {
             this.done = done;
             this.rateLimiter = rateLimiter;
             this.workManager = workManager;
             this.metrics = metrics;
-            this.operations = operations.get(metrics.getTiming(), sampleCount);
+            this.operations = operations;
         }
 
         public void run()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/232e12b9/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
index a640058..a4f280e 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
@@ -158,7 +158,7 @@ public class StressMetrics
         TimingInterval current = result.intervals.combine(settings.samples.reportCount);
         TimingInterval history = timing.getHistory().combine(settings.samples.historyCount);
         rowRateUncertainty.update(current.adjustedRowRate());
-        if (current.partitionCount != 0)
+        if (current.operationCount != 0)
         {
             if (result.intervals.intervals().size() > 1)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/232e12b9/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
index 297a004..5243d96 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.stress.generate.*;
 import org.apache.cassandra.stress.generate.values.*;
+import org.apache.cassandra.stress.operations.userdefined.TokenRangeQuery;
 import org.apache.cassandra.stress.operations.userdefined.SchemaInsert;
 import org.apache.cassandra.stress.operations.userdefined.SchemaQuery;
 import org.apache.cassandra.stress.operations.userdefined.ValidatingSchemaQuery;
@@ -66,9 +67,11 @@ public class StressProfile implements Serializable
     public String tableName;
     private Map<String, GeneratorConfig> columnConfigs;
     private Map<String, StressYaml.QueryDef> queries;
+    public Map<String, StressYaml.TokenRangeQueryDef> tokenRangeQueries;
     private Map<String, String> insert;
 
     transient volatile TableMetadata tableMetaData;
+    transient volatile Set<TokenRange> tokenRanges;
 
     transient volatile GeneratorFactory generatorFactory;
 
@@ -92,6 +95,7 @@ public class StressProfile implements Serializable
         tableCql = yaml.table_definition;
         seedStr = "seed for stress";
         queries = yaml.queries;
+        tokenRangeQueries = yaml.token_range_queries;
         insert = yaml.insert;
 
         extraSchemaDefinitions = yaml.extra_definitions;
@@ -100,6 +104,10 @@ public class StressProfile implements Serializable
         assert tableName != null : "table name is required in yaml file";
         assert queries != null : "queries map is required in yaml file";
 
+        for (String query : queries.keySet())
+            assert !tokenRangeQueries.containsKey(query) :
+                String.format("Found %s in both queries and token_range_queries, please use different names", query);
+
         if (keyspaceCql != null && keyspaceCql.length() > 0)
         {
             try
@@ -254,8 +262,48 @@ public class StressProfile implements Serializable
         }
     }
 
-    public SchemaQuery getQuery(String name, Timer timer, PartitionGenerator generator, SeedManager seeds, StressSettings settings)
+    public Set<TokenRange> maybeLoadTokenRanges(StressSettings settings)
     {
+        maybeLoadSchemaInfo(settings); // ensure table metadata is available
+
+        JavaDriverClient client = settings.getJavaDriverClient();
+        synchronized (client)
+        {
+            if (tokenRanges != null)
+                return tokenRanges;
+
+            Cluster cluster = client.getCluster();
+            Metadata metadata = cluster.getMetadata();
+            if (metadata == null)
+                throw new RuntimeException("Unable to get metadata");
+
+            List<TokenRange> sortedRanges = new ArrayList<>(metadata.getTokenRanges().size() + 1);
+            for (TokenRange range : metadata.getTokenRanges())
+            {
+                //if we don't unwrap we miss the partitions between ring min and smallest range start value
+                if (range.isWrappedAround())
+                    sortedRanges.addAll(range.unwrap());
+                else
+                    sortedRanges.add(range);
+            }
+
+            Collections.sort(sortedRanges);
+            tokenRanges = new LinkedHashSet<>(sortedRanges);
+            return tokenRanges;
+        }
+    }
+
+    public Operation getQuery(String name,
+                              Timer timer,
+                              PartitionGenerator generator,
+                              SeedManager seeds,
+                              StressSettings settings,
+                              boolean isWarmup)
+    {
+        name = name.toLowerCase();
+        if (!queries.containsKey(name))
+            throw new IllegalArgumentException("No query defined with name " + name);
+
         if (queryStatements == null)
         {
             synchronized (this)
@@ -296,13 +344,19 @@ public class StressProfile implements Serializable
             }
         }
 
-        name = name.toLowerCase();
-        if (!queryStatements.containsKey(name))
-            throw new IllegalArgumentException("No query defined with name " + name);
         return new SchemaQuery(timer, settings, generator, seeds, thriftQueryIds.get(name), queryStatements.get(name),
                                ThriftConversion.fromThrift(settings.command.consistencyLevel), argSelects.get(name));
     }
 
+    public Operation getBulkReadQueries(String name, Timer timer, StressSettings settings, TokenRangeIterator tokenRangeIterator, boolean isWarmup)
+    {
+        StressYaml.TokenRangeQueryDef def = tokenRangeQueries.get(name);
+        if (def == null)
+            throw new IllegalArgumentException("No bulk read query defined with name " + name);
+
+        return new TokenRangeQuery(timer, settings, tableMetaData, tokenRangeIterator, def, isWarmup);
+    }
+
     public SchemaInsert getInsert(Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings)
     {
         if (insertStatement == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/232e12b9/tools/stress/src/org/apache/cassandra/stress/StressYaml.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressYaml.java b/tools/stress/src/org/apache/cassandra/stress/StressYaml.java
index 90797b4..214e56a 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressYaml.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressYaml.java
@@ -20,6 +20,7 @@
  */
 package org.apache.cassandra.stress;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -35,6 +36,7 @@ public class StressYaml
     public List<Map<String, Object>> columnspec;
     public Map<String, QueryDef> queries;
     public Map<String, String> insert;
+    public Map<String, TokenRangeQueryDef> token_range_queries = new HashMap<>();
 
     public static class QueryDef
     {
@@ -42,4 +44,10 @@ public class StressYaml
         public String fields;
     }
 
+    public static class TokenRangeQueryDef
+    {
+        public String columns;
+        public int page_size = 5000;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/232e12b9/tools/stress/src/org/apache/cassandra/stress/WorkManager.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/WorkManager.java b/tools/stress/src/org/apache/cassandra/stress/WorkManager.java
index c6a3eee..78d4176 100644
--- a/tools/stress/src/org/apache/cassandra/stress/WorkManager.java
+++ b/tools/stress/src/org/apache/cassandra/stress/WorkManager.java
@@ -2,7 +2,7 @@ package org.apache.cassandra.stress;
 
 import java.util.concurrent.atomic.AtomicLong;
 
-interface WorkManager
+public interface WorkManager
 {
     // -1 indicates consumer should terminate
     int takePermits(int count);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/232e12b9/tools/stress/src/org/apache/cassandra/stress/generate/TokenRangeIterator.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/TokenRangeIterator.java b/tools/stress/src/org/apache/cassandra/stress/generate/TokenRangeIterator.java
new file mode 100644
index 0000000..5ddac61
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/TokenRangeIterator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.stress.generate;
+
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import com.datastax.driver.core.TokenRange;
+import org.apache.cassandra.stress.settings.StressSettings;
+
+public class TokenRangeIterator
+{
+    private final Set<TokenRange> tokenRanges;
+    private final ConcurrentLinkedQueue<TokenRange> pendingRanges;
+    private final boolean wrap;
+
+    public TokenRangeIterator(StressSettings settings, Set<TokenRange> tokenRanges)
+    {
+        this.tokenRanges = maybeSplitRanges(tokenRanges, settings.tokenRange.splitFactor);
+        this.pendingRanges = new ConcurrentLinkedQueue<>(this.tokenRanges);
+        this.wrap = settings.tokenRange.wrap;
+    }
+
+    private static Set<TokenRange> maybeSplitRanges(Set<TokenRange> tokenRanges, int splitFactor)
+    {
+        if (splitFactor <= 1)
+            return tokenRanges;
+
+        Set<TokenRange> ret = new TreeSet<>();
+        for (TokenRange range : tokenRanges)
+            ret.addAll(range.splitEvenly(splitFactor));
+
+        return ret;
+    }
+
+    public void update()
+    {
+        // we may race and add to the queue twice but no bad consequence so it's fine if that happens
+        // as ultimately only the permits determine when to stop if wrap is true
+        if (wrap && pendingRanges.isEmpty())
+            pendingRanges.addAll(tokenRanges);
+    }
+
+    public TokenRange next()
+    {
+        return pendingRanges.poll();
+    }
+
+    public boolean exhausted()
+    {
+        return pendingRanges.isEmpty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/232e12b9/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
index 7e13fcd..5fbb0f9 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
@@ -25,7 +25,7 @@ import org.apache.cassandra.stress.util.Timing;
 
 public interface OpDistributionFactory
 {
-    public OpDistribution get(Timing timing, int sampleCount);
+    public OpDistribution get(Timing timing, int sampleCount, boolean isWarmup);
     public String desc();
     Iterable<OpDistributionFactory> each();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/232e12b9/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java
new file mode 100644
index 0000000..45c36f2
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.stress.operations;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.WorkManager;
+import org.apache.cassandra.stress.generate.Distribution;
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.PartitionIterator;
+import org.apache.cassandra.stress.generate.RatioDistribution;
+import org.apache.cassandra.stress.generate.Seed;
+import org.apache.cassandra.stress.generate.SeedManager;
+import org.apache.cassandra.stress.settings.OptionRatioDistribution;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.Timer;
+
+public abstract class PartitionOperation extends Operation
+{
+    protected final DataSpec spec;
+    private final static RatioDistribution defaultRowPopulationRatio = OptionRatioDistribution.BUILDER.apply("fixed(1)/1").get();
+
+    private final List<PartitionIterator> partitionCache = new ArrayList<>();
+    protected List<PartitionIterator> partitions;
+
+    public static final class DataSpec
+    {
+        public final PartitionGenerator partitionGenerator;
+        final SeedManager seedManager;
+        final Distribution partitionCount;
+        final RatioDistribution useRatio;
+        final RatioDistribution rowPopulationRatio;
+        final Integer targetCount;
+
+        public DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution rowPopulationRatio, Integer targetCount)
+        {
+            this(partitionGenerator, seedManager, partitionCount, null, rowPopulationRatio, targetCount);
+        }
+        public DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution useRatio, RatioDistribution rowPopulationRatio)
+        {
+            this(partitionGenerator, seedManager, partitionCount, useRatio, rowPopulationRatio, null);
+        }
+        private DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution useRatio, RatioDistribution rowPopulationRatio, Integer targetCount)
+        {
+            this.partitionGenerator = partitionGenerator;
+            this.seedManager = seedManager;
+            this.partitionCount = partitionCount;
+            this.useRatio = useRatio;
+            this.rowPopulationRatio = rowPopulationRatio == null ? defaultRowPopulationRatio : rowPopulationRatio;
+            this.targetCount = targetCount;
+        }
+    }
+
+    public PartitionOperation(Timer timer, StressSettings settings, DataSpec spec)
+    {
+        super(timer, settings);
+        this.spec = spec;
+    }
+
+    public boolean ready(WorkManager permits, RateLimiter rateLimiter)
+    {
+        int partitionCount = (int) spec.partitionCount.next();
+        if (partitionCount <= 0)
+            return false;
+        partitionCount = permits.takePermits(partitionCount);
+        if (partitionCount <= 0)
+            return false;
+
+        int i = 0;
+        boolean success = true;
+        for (; i < partitionCount && success ; i++)
+        {
+            if (i >= partitionCache.size())
+                partitionCache.add(PartitionIterator.get(spec.partitionGenerator, spec.seedManager));
+
+            success = false;
+            while (!success)
+            {
+                Seed seed = spec.seedManager.next(this);
+                if (seed == null)
+                    break;
+
+                success = reset(seed, partitionCache.get(i));
+            }
+        }
+        partitionCount = i;
+
+        if (rateLimiter != null)
+            rateLimiter.acquire(partitionCount);
+
+        partitions = partitionCache.subList(0, partitionCount);
+        return !partitions.isEmpty();
+    }
+
+    protected boolean reset(Seed seed, PartitionIterator iterator)
+    {
+        if (spec.useRatio == null)
+            return iterator.reset(seed, spec.targetCount, spec.rowPopulationRatio.next(), isWrite());
+        else
+            return iterator.reset(seed, spec.useRatio.next(), spec.rowPopulationRatio.next(), isWrite());
+    }
+
+    public String key()
+    {
+        List<String> keys = new ArrayList<>();
+        for (PartitionIterator partition : partitions)
+            keys.add(partition.getKeyAsString());
+        return keys.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/232e12b9/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
index 194f84f..a10585d 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
@@ -44,16 +44,17 @@ public abstract class SampledOpDistributionFactory<T> implements OpDistributionF
         this.clustering = clustering;
     }
 
-    protected abstract List<? extends Operation> get(Timer timer, PartitionGenerator generator, T key);
+    protected abstract List<? extends Operation> get(Timer timer, PartitionGenerator generator, T key, boolean isWarmup);
     protected abstract PartitionGenerator newGenerator();
 
-    public OpDistribution get(Timing timing, int sampleCount)
+    public OpDistribution get(Timing timing, int sampleCount, boolean isWarmup)
     {
         PartitionGenerator generator = newGenerator();
         List<Pair<Operation, Double>> operations = new ArrayList<>();
         for (Map.Entry<T, Double> ratio : ratios.entrySet())
         {
-            List<? extends Operation> ops = get(timing.newTimer(ratio.getKey().toString(), sampleCount), generator, ratio.getKey());
+            List<? extends Operation> ops = get(timing.newTimer(ratio.getKey().toString(), sampleCount),
+                                                generator, ratio.getKey(), isWarmup);
             for (Operation op : ops)
                 operations.add(new Pair<>(op, ratio.getValue() / ops.size()));
         }
@@ -75,15 +76,18 @@ public abstract class SampledOpDistributionFactory<T> implements OpDistributionF
         {
             out.add(new OpDistributionFactory()
             {
-                public OpDistribution get(Timing timing, int sampleCount)
+                public OpDistribution get(Timing timing, int sampleCount, boolean isWarmup)
                 {
-                    List<? extends Operation> ops = SampledOpDistributionFactory.this.get(timing.newTimer(ratio.getKey().toString(), sampleCount), newGenerator(), ratio.getKey());
+                    List<? extends Operation> ops = SampledOpDistributionFactory.this.get(timing.newTimer(ratio.getKey().toString(), sampleCount),
+                                                                                          newGenerator(),
+                                                                                          ratio.getKey(),
+                                                                                          isWarmup);
                     if (ops.size() == 1)
                         return new FixedOpDistribution(ops.get(0));
                     List<Pair<Operation, Double>> ratios = new ArrayList<>();
                     for (Operation op : ops)
                         ratios.add(new Pair<>(op, 1d / ops.size()));
-                    return new SampledOpDistribution(new EnumeratedDistribution<Operation>(ratios), new DistributionFixed(1));
+                    return new SampledOpDistribution(new EnumeratedDistribution<>(ratios), new DistributionFixed(1));
                 }
 
                 public String desc()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/232e12b9/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
index 66f232a..3767401 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.cassandra.stress.Operation;
 import org.apache.cassandra.stress.generate.*;
+import org.apache.cassandra.stress.operations.PartitionOperation;
 import org.apache.cassandra.stress.settings.Command;
 import org.apache.cassandra.stress.settings.CqlVersion;
 import org.apache.cassandra.stress.settings.StressSettings;
@@ -32,7 +33,7 @@ import org.apache.cassandra.stress.util.Timer;
 import org.apache.cassandra.thrift.SlicePredicate;
 import org.apache.cassandra.thrift.SliceRange;
 
-public abstract class PredefinedOperation extends Operation
+public abstract class PredefinedOperation extends PartitionOperation
 {
     public final Command type;
     private final Distribution columnCount;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/232e12b9/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
index 49891ec..c9ead12 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
@@ -34,12 +34,13 @@ import com.datastax.driver.core.PreparedStatement;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.stress.Operation;
 import org.apache.cassandra.stress.generate.Row;
+import org.apache.cassandra.stress.operations.PartitionOperation;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.JavaDriverClient;
 import org.apache.cassandra.stress.util.Timer;
 import org.apache.cassandra.transport.SimpleClient;
 
-public abstract class SchemaStatement extends Operation
+public abstract class SchemaStatement extends PartitionOperation
 {
 
     final PreparedStatement statement;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/232e12b9/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
new file mode 100644
index 0000000..60a6c48
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.stress.operations.userdefined;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import javax.naming.OperationNotSupportedException;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.PagingState;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.Token;
+import com.datastax.driver.core.TokenRange;
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.StressYaml;
+import org.apache.cassandra.stress.WorkManager;
+import org.apache.cassandra.stress.generate.TokenRangeIterator;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.JavaDriverClient;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.stress.util.Timer;
+
+public class TokenRangeQuery extends Operation
+{
+    private final ThreadLocal<State> currentState = new ThreadLocal<>();
+
+    private final TableMetadata tableMetadata;
+    private final TokenRangeIterator tokenRangeIterator;
+    private final String columns;
+    private final int pageSize;
+    private final boolean isWarmup;
+
+    public TokenRangeQuery(Timer timer,
+                           StressSettings settings,
+                           TableMetadata tableMetadata,
+                           TokenRangeIterator tokenRangeIterator,
+                           StressYaml.TokenRangeQueryDef def,
+                           boolean isWarmup)
+    {
+        super(timer, settings);
+        this.tableMetadata = tableMetadata;
+        this.tokenRangeIterator = tokenRangeIterator;
+        this.columns = sanitizeColumns(def.columns, tableMetadata);
+        this.pageSize = isWarmup ? Math.min(100, def.page_size) : def.page_size;
+        this.isWarmup = isWarmup;
+    }
+
+    /**
+     * We need to specify the columns by name because we need to add token(partition_keys) in order to count
+     * partitions. So if the user specifies '*' then replace it with a list of all columns.
+     */
+    private static String sanitizeColumns(String columns, TableMetadata tableMetadata)
+    {
+        if (!columns.equals("*"))
+            return columns;
+
+        return String.join(", ", tableMetadata.getColumns().stream().map(ColumnMetadata::getName).collect(Collectors.toList()));
+    }
+
+    /**
+     * The state of a token range currently being retrieved.
+     * Here we store the paging state to retrieve more pages
+     * and we keep track of which partitions have already been retrieved,
+     */
+    private final static class State
+    {
+        public final TokenRange tokenRange;
+        public final String query;
+        public PagingState pagingState;
+        public Set<Token> partitions = new HashSet<>();
+
+        public State(TokenRange tokenRange, String query)
+        {
+            this.tokenRange = tokenRange;
+            this.query = query;
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("[%s, %s]", tokenRange.getStart(), tokenRange.getEnd());
+        }
+    }
+
+    abstract static class Runner implements RunOp
+    {
+        int partitionCount;
+        int rowCount;
+
+        @Override
+        public int partitionCount()
+        {
+            return partitionCount;
+        }
+
+        @Override
+        public int rowCount()
+        {
+            return rowCount;
+        }
+    }
+
+    private class JavaDriverRun extends Runner
+    {
+        final JavaDriverClient client;
+
+        private JavaDriverRun(JavaDriverClient client)
+        {
+            this.client = client;
+        }
+
+        public boolean run() throws Exception
+        {
+            State state = currentState.get();
+            if (state == null)
+            { // start processing a new token range
+                TokenRange range = tokenRangeIterator.next();
+                if (range == null)
+                    return true; // no more token ranges to process
+
+                state = new State(range, buildQuery(range));
+                currentState.set(state);
+            }
+
+            ResultSet results;
+            Statement statement = new SimpleStatement(state.query);
+            statement.setFetchSize(pageSize);
+
+            if (state.pagingState != null)
+                statement.setPagingState(state.pagingState);
+
+            results = client.getSession().execute(statement);
+            state.pagingState = results.getExecutionInfo().getPagingState();
+
+            int remaining = results.getAvailableWithoutFetching();
+            rowCount += remaining;
+
+            for (Row row : results)
+            {
+                // this call will only succeed if we've added token(partition keys) to the query
+                Token partition = row.getPartitionKeyToken();
+                if (!state.partitions.contains(partition))
+                {
+                    partitionCount += 1;
+                    state.partitions.add(partition);
+                }
+
+                if (--remaining == 0)
+                    break;
+            }
+
+            if (results.isExhausted() || isWarmup)
+            { // no more pages to fetch or just warming up, ready to move on to another token range
+                currentState.set(null);
+            }
+
+            return true;
+        }
+    }
+
+    private String buildQuery(TokenRange tokenRange)
+    {
+        Token start = tokenRange.getStart();
+        Token end = tokenRange.getEnd();
+        List<String> pkColumns = tableMetadata.getPartitionKey().stream().map(ColumnMetadata::getName).collect(Collectors.toList());
+        String tokenStatement = String.format("token(%s)", String.join(", ", pkColumns));
+
+        StringBuilder ret = new StringBuilder();
+        ret.append("SELECT ");
+        ret.append(tokenStatement); // add the token(pk) statement so that we can count partitions
+        ret.append(", ");
+        ret.append(columns);
+        ret.append(" FROM ");
+        ret.append(tableMetadata.getName());
+        if (start != null || end != null)
+            ret.append(" WHERE ");
+        if (start != null)
+        {
+            ret.append(tokenStatement);
+            ret.append(" > ");
+            ret.append(start.toString());
+        }
+
+        if (start != null && end != null)
+            ret.append(" AND ");
+
+        if (end != null)
+        {
+            ret.append(tokenStatement);
+            ret.append(" <= ");
+            ret.append(end.toString());
+        }
+
+        return ret.toString();
+    }
+
+    private static class ThriftRun extends Runner
+    {
+        final ThriftClient client;
+
+        private ThriftRun(ThriftClient client)
+        {
+            this.client = client;
+        }
+
+        public boolean run() throws Exception
+        {
+            throw new OperationNotSupportedException("Bulk read over thrift not supported");
+        }
+    }
+
+
+    @Override
+    public void run(JavaDriverClient client) throws IOException
+    {
+        timeWithRetry(new JavaDriverRun(client));
+    }
+
+    @Override
+    public void run(ThriftClient client) throws IOException
+    {
+        timeWithRetry(new ThriftRun(client));
+    }
+
+    public boolean ready(WorkManager workManager, RateLimiter rateLimiter)
+    {
+        tokenRangeIterator.update();
+
+        if (tokenRangeIterator.exhausted() && currentState.get() == null)
+            return false;
+
+        int numLeft = workManager.takePermits(1);
+        if (rateLimiter != null && numLeft > 0 )
+            rateLimiter.acquire(numLeft);
+
+        return numLeft > 0;
+    }
+
+    public String key()
+    {
+        State state = currentState.get();
+        return state == null ? "-" : state.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/232e12b9/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
index 8bdde51..02a9ca8 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.stress.Operation;
 import org.apache.cassandra.stress.generate.*;
 import org.apache.cassandra.stress.generate.Row;
+import org.apache.cassandra.stress.operations.PartitionOperation;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.JavaDriverClient;
 import org.apache.cassandra.stress.util.ThriftClient;
@@ -46,7 +47,7 @@ import org.apache.cassandra.transport.SimpleClient;
 import org.apache.cassandra.utils.Pair;
 import org.apache.thrift.TException;
 
-public class ValidatingSchemaQuery extends Operation
+public class ValidatingSchemaQuery extends PartitionOperation
 {
     final Random random = new Random();
     private Pair<Row, Row> bounds;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/232e12b9/tools/stress/src/org/apache/cassandra/stress/settings/CliOption.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/CliOption.java b/tools/stress/src/org/apache/cassandra/stress/settings/CliOption.java
index 5ec56dc..eb286ee 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/CliOption.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/CliOption.java
@@ -38,7 +38,8 @@ public enum CliOption
     LOG("Where to log progress to, and the interval at which to do it", SettingsLog.helpPrinter()),
     TRANSPORT("Custom transport factories", SettingsTransport.helpPrinter()),
     PORT("The port to connect to cassandra nodes on", SettingsPort.helpPrinter()),
-    SENDTO("-send-to", "Specify a stress server to send this command to", SettingsMisc.sendToDaemonHelpPrinter())
+    SENDTO("-send-to", "Specify a stress server to send this command to", SettingsMisc.sendToDaemonHelpPrinter()),
+    TOKENRANGE("Token range settings", SettingsTokenRange.helpPrinter())
     ;
 
     private static final Map<String, CliOption> LOOKUP;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/232e12b9/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java
index 83f444c..c2f2591 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java
@@ -51,7 +51,7 @@ public class SettingsCommandPreDefined extends SettingsCommand
         final SeedManager seeds = new SeedManager(settings);
         return new OpDistributionFactory()
         {
-            public OpDistribution get(Timing timing, int sampleCount)
+            public OpDistribution get(Timing timing, int sampleCount, boolean isWarmup)
             {
                 return new FixedOpDistribution(PredefinedOperation.operation(type, timing.newTimer(type.toString(), sampleCount),
                                                newGenerator(settings), seeds, settings, add));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/232e12b9/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java
index 861b1a4..dd11452 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java
@@ -55,7 +55,7 @@ public class SettingsCommandPreDefinedMixed extends SettingsCommandPreDefined
         final SeedManager seeds = new SeedManager(settings);
         return new SampledOpDistributionFactory<Command>(ratios, clustering)
         {
-            protected List<? extends Operation> get(Timer timer, PartitionGenerator generator, Command key)
+            protected List<? extends Operation> get(Timer timer, PartitionGenerator generator, Command key, boolean isWarmup)
             {
                 return Collections.singletonList(PredefinedOperation.operation(key, timer, generator, seeds, settings, add));
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/232e12b9/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
index 8440e8e..36cbefe 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.stress.StressProfile;
 import org.apache.cassandra.stress.generate.DistributionFactory;
 import org.apache.cassandra.stress.generate.PartitionGenerator;
 import org.apache.cassandra.stress.generate.SeedManager;
+import org.apache.cassandra.stress.generate.TokenRangeIterator;
 import org.apache.cassandra.stress.operations.OpDistributionFactory;
 import org.apache.cassandra.stress.operations.SampledOpDistributionFactory;
 import org.apache.cassandra.stress.util.Timer;
@@ -69,15 +70,24 @@ public class SettingsCommandUser extends SettingsCommand
     public OpDistributionFactory getFactory(final StressSettings settings)
     {
         final SeedManager seeds = new SeedManager(settings);
+        final TokenRangeIterator tokenRangeIterator = profile.tokenRangeQueries.isEmpty()
+                                                      ? null
+                                                      : new TokenRangeIterator(settings,
+                                                                               profile.maybeLoadTokenRanges(settings));
+
         return new SampledOpDistributionFactory<String>(ratios, clustering)
         {
-            protected List<? extends Operation> get(Timer timer, PartitionGenerator generator, String key)
+            protected List<? extends Operation> get(Timer timer, PartitionGenerator generator, String key, boolean isWarmup)
             {
                 if (key.equalsIgnoreCase("insert"))
                     return Collections.singletonList(profile.getInsert(timer, generator, seeds, settings));
                 if (key.equalsIgnoreCase("validate"))
                     return profile.getValidate(timer, generator, seeds, settings);
-                return Collections.singletonList(profile.getQuery(key, timer, generator, seeds, settings));
+
+                if (profile.tokenRangeQueries.containsKey(key))
+                    return Collections.singletonList(profile.getBulkReadQueries(key, timer, settings, tokenRangeIterator, isWarmup));
+
+                return Collections.singletonList(profile.getQuery(key, timer, generator, seeds, settings, isWarmup));
             }
 
             protected PartitionGenerator newGenerator()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/232e12b9/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTokenRange.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTokenRange.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTokenRange.java
new file mode 100644
index 0000000..8fb0048
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTokenRange.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.stress.settings;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.Ints;
+
+public class SettingsTokenRange
+{
+    public final boolean wrap;
+    public final int splitFactor;
+
+    public SettingsTokenRange(TokenRangeOptions options)
+    {
+        this.wrap = options.wrap.setByUser();
+        this.splitFactor = Ints.checkedCast(OptionDistribution.parseLong(options.splitFactor.value()));
+    }
+
+    private static final class TokenRangeOptions extends GroupedOptions
+    {
+        final OptionSimple wrap = new OptionSimple("wrap", "", null, "Re-use token ranges in order to terminate stress iterations", false);
+        final OptionSimple splitFactor = new OptionSimple("split-factor=", "[0-9]+[bmk]?", "1", "Split every token range by this factor", false);
+
+
+        @Override
+        public List<? extends Option> options()
+        {
+            return ImmutableList.<Option>builder().add(wrap, splitFactor).build();
+        }
+    }
+
+    public static SettingsTokenRange get(Map<String, String[]> clArgs)
+    {
+        String[] params = clArgs.remove("-tokenrange");
+        if (params == null)
+        {
+            return new SettingsTokenRange(new TokenRangeOptions());
+        }
+        TokenRangeOptions options = GroupedOptions.select(params, new TokenRangeOptions());
+        if (options == null)
+        {
+            printHelp();
+            System.out.println("Invalid -tokenrange options provided, see output for valid options");
+            System.exit(1);
+        }
+        return new SettingsTokenRange(options);
+    }
+
+    public static void printHelp()
+    {
+        GroupedOptions.printOptions(System.out, "-tokenrange", new TokenRangeOptions());
+    }
+
+    public static Runnable helpPrinter()
+    {
+        return SettingsTokenRange::printHelp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/232e12b9/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
index 335ca92..5b1f861 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
@@ -55,7 +55,8 @@ public class StressSettings implements Serializable
     public final SettingsPort port;
     public final String sendToDaemon;
 
-    public StressSettings(SettingsCommand command, SettingsRate rate, SettingsPopulation generate, SettingsInsert insert, SettingsColumn columns, SettingsSamples samples, SettingsErrors errors, SettingsLog log, SettingsMode mode, SettingsNode node, SettingsSchema schema, SettingsTransport transport, SettingsPort port, String sendToDaemon)
+    public final SettingsTokenRange tokenRange;
+    public StressSettings(SettingsCommand command, SettingsRate rate, SettingsPopulation generate, SettingsInsert insert, SettingsColumn columns, SettingsSamples samples, SettingsErrors errors, SettingsLog log, SettingsMode mode, SettingsNode node, SettingsSchema schema, SettingsTransport transport, SettingsPort port, String sendToDaemon, SettingsTokenRange tokenRange)
     {
         this.command = command;
         this.rate = rate;
@@ -71,6 +72,7 @@ public class StressSettings implements Serializable
         this.transport = transport;
         this.port = port;
         this.sendToDaemon = sendToDaemon;
+        this.tokenRange = tokenRange;
     }
 
     private SmartThriftClient tclient;
@@ -253,6 +255,7 @@ public class StressSettings implements Serializable
         SettingsPort port = SettingsPort.get(clArgs);
         SettingsRate rate = SettingsRate.get(clArgs, command);
         SettingsPopulation generate = SettingsPopulation.get(clArgs, command);
+        SettingsTokenRange tokenRange = SettingsTokenRange.get(clArgs);
         SettingsInsert insert = SettingsInsert.get(clArgs);
         SettingsColumn columns = SettingsColumn.get(clArgs);
         SettingsSamples samples = SettingsSamples.get(clArgs);
@@ -278,7 +281,7 @@ public class StressSettings implements Serializable
             }
             System.exit(1);
         }
-        return new StressSettings(command, rate, generate, insert, columns, samples, errors, log, mode, node, schema, transport, port, sendToDaemon);
+        return new StressSettings(command, rate, generate, insert, columns, samples, errors, log, mode, node, schema, transport, port, sendToDaemon, tokenRange);
     }
 
     private static Map<String, String[]> parseMap(String[] args)