You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2016/03/10 16:26:35 UTC

[7/8] cassandra git commit: Establish and implement canonical bulk reading workload(s)

Establish and implement canonical bulk reading workload(s)

patch by Stefania Alborghetti; reviewed by Jake Luciani for CASSANDRA-10331


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f27ab290
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f27ab290
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f27ab290

Branch: refs/heads/cassandra-3.5
Commit: f27ab2908a06056b332e3a87008f0f8560a9620b
Parents: 9fd8dfc
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Wed Mar 2 17:40:20 2016 +0800
Committer: T Jake Luciani <ja...@apache.org>
Committed: Thu Mar 10 10:23:51 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 tools/cqlstress-example.yaml                    |  13 +
 tools/stress/README.txt                         |   5 +
 .../org/apache/cassandra/stress/Operation.java  |  91 +------
 .../apache/cassandra/stress/StressAction.java   |  38 ++-
 .../apache/cassandra/stress/StressMetrics.java  |   2 +-
 .../apache/cassandra/stress/StressProfile.java  |  62 ++++-
 .../org/apache/cassandra/stress/StressYaml.java |   8 +
 .../apache/cassandra/stress/WorkManager.java    |   2 +-
 .../stress/generate/TokenRangeIterator.java     |  70 +++++
 .../operations/OpDistributionFactory.java       |   2 +-
 .../stress/operations/PartitionOperation.java   | 130 +++++++++
 .../SampledOpDistributionFactory.java           |  16 +-
 .../predefined/PredefinedOperation.java         |   3 +-
 .../operations/userdefined/SchemaStatement.java |   3 +-
 .../operations/userdefined/TokenRangeQuery.java | 270 +++++++++++++++++++
 .../userdefined/ValidatingSchemaQuery.java      |   3 +-
 .../cassandra/stress/settings/CliOption.java    |   3 +-
 .../settings/SettingsCommandPreDefined.java     |   2 +-
 .../SettingsCommandPreDefinedMixed.java         |   2 +-
 .../stress/settings/SettingsCommandUser.java    |  14 +-
 .../stress/settings/SettingsTokenRange.java     |  77 ++++++
 .../stress/settings/StressSettings.java         |   8 +-
 23 files changed, 701 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 58c7ed0..68f5714 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 3.5
 Merged from 3.0:
+ * Establish and implement canonical bulk reading workload(s) (CASSANDRA-10331)
  * Fix paging for IN queries on tables without clustering columns (CASSANDRA-11208)
  * Remove recursive call from CompositesSearcher (CASSANDRA-11304)
  * Fix filtering on non-primary key columns for queries without index (CASSANDRA-6377)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/cqlstress-example.yaml
----------------------------------------------------------------------
diff --git a/tools/cqlstress-example.yaml b/tools/cqlstress-example.yaml
index 3c60c32..835a4cb 100644
--- a/tools/cqlstress-example.yaml
+++ b/tools/cqlstress-example.yaml
@@ -93,3 +93,16 @@ queries:
    range1:
       cql: select * from typestest where name = ? and choice = ? and date >= ? LIMIT 100
       fields: multirow            # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition)
+
+
+#
+# A list of bulk read queries that analytics tools may perform against the schema
+# Each query will sweep an entire token range, page by page.
+#
+token_range_queries:
+  all_columns_tr_query:
+    columns: '*'
+    page_size: 5000
+
+  value_tr_query:
+    columns: value

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/README.txt
----------------------------------------------------------------------
diff --git a/tools/stress/README.txt b/tools/stress/README.txt
index 0046b25..aa89dab 100644
--- a/tools/stress/README.txt
+++ b/tools/stress/README.txt
@@ -72,6 +72,11 @@ Primary Options:
         The port to connect to cassandra nodes on
     -sendto:
         Specify a stress server to send this command to
+    -graph:
+        Graph recorded metrics
+    -tokenrange:
+        Token range settings
+
 
 Suboptions:
     Every command and primary option has its own collection of suboptions. These are too numerous to list here.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/Operation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Operation.java b/tools/stress/src/org/apache/cassandra/stress/Operation.java
index 139dd53..0f13d3c 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Operation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Operation.java
@@ -19,13 +19,9 @@
 package org.apache.cassandra.stress;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 
 import com.google.common.util.concurrent.RateLimiter;
 
-import org.apache.cassandra.stress.generate.*;
-import org.apache.cassandra.stress.settings.OptionRatioDistribution;
 import org.apache.cassandra.stress.settings.SettingsLog;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.JavaDriverClient;
@@ -38,45 +34,11 @@ public abstract class Operation
 {
     public final StressSettings settings;
     public final Timer timer;
-    protected final DataSpec spec;
-    private final static RatioDistribution defaultRowPopulationRatio = OptionRatioDistribution.BUILDER.apply("fixed(1)/1").get();
 
-    private final List<PartitionIterator> partitionCache = new ArrayList<>();
-    protected List<PartitionIterator> partitions;
-
-    public static final class DataSpec
-    {
-        public final PartitionGenerator partitionGenerator;
-        final SeedManager seedManager;
-        final Distribution partitionCount;
-        final RatioDistribution useRatio;
-        final RatioDistribution rowPopulationRatio;
-        final Integer targetCount;
-
-        public DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution rowPopulationRatio, Integer targetCount)
-        {
-            this(partitionGenerator, seedManager, partitionCount, null, rowPopulationRatio, targetCount);
-        }
-        public DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution useRatio, RatioDistribution rowPopulationRatio)
-        {
-            this(partitionGenerator, seedManager, partitionCount, useRatio, rowPopulationRatio, null);
-        }
-        private DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution useRatio, RatioDistribution rowPopulationRatio, Integer targetCount)
-        {
-            this.partitionGenerator = partitionGenerator;
-            this.seedManager = seedManager;
-            this.partitionCount = partitionCount;
-            this.useRatio = useRatio;
-            this.rowPopulationRatio = rowPopulationRatio == null ? defaultRowPopulationRatio : rowPopulationRatio;
-            this.targetCount = targetCount;
-        }
-    }
-
-    public Operation(Timer timer, StressSettings settings, DataSpec spec)
+    public Operation(Timer timer, StressSettings settings)
     {
         this.timer = timer;
         this.settings = settings;
-        this.spec = spec;
     }
 
     public static interface RunOp
@@ -86,48 +48,7 @@ public abstract class Operation
         public int rowCount();
     }
 
-    boolean ready(WorkManager permits, RateLimiter rateLimiter)
-    {
-        int partitionCount = (int) spec.partitionCount.next();
-        if (partitionCount <= 0)
-            return false;
-        partitionCount = permits.takePermits(partitionCount);
-        if (partitionCount <= 0)
-            return false;
-
-        int i = 0;
-        boolean success = true;
-        for (; i < partitionCount && success ; i++)
-        {
-            if (i >= partitionCache.size())
-                partitionCache.add(PartitionIterator.get(spec.partitionGenerator, spec.seedManager));
-
-            success = false;
-            while (!success)
-            {
-                Seed seed = spec.seedManager.next(this);
-                if (seed == null)
-                    break;
-
-                success = reset(seed, partitionCache.get(i));
-            }
-        }
-        partitionCount = i;
-
-        if (rateLimiter != null)
-            rateLimiter.acquire(partitionCount);
-
-        partitions = partitionCache.subList(0, partitionCount);
-        return !partitions.isEmpty();
-    }
-
-    protected boolean reset(Seed seed, PartitionIterator iterator)
-    {
-        if (spec.useRatio == null)
-            return iterator.reset(seed, spec.targetCount, spec.rowPopulationRatio.next(), isWrite());
-        else
-            return iterator.reset(seed, spec.useRatio.next(), spec.rowPopulationRatio.next(), isWrite());
-    }
+    public abstract boolean ready(WorkManager permits, RateLimiter rateLimiter);
 
     public boolean isWrite()
     {
@@ -202,13 +123,7 @@ public abstract class Operation
 
     }
 
-    private String key()
-    {
-        List<String> keys = new ArrayList<>();
-        for (PartitionIterator partition : partitions)
-            keys.add(partition.getKeyAsString());
-        return keys.toString();
-    }
+    public abstract String key();
 
     protected String getExceptionMessage(Exception e)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/StressAction.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
index 3150e14..ebe6270 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
@@ -71,7 +71,7 @@ public class StressAction implements Runnable
             success = runMulti(settings.rate.auto, rateLimiter);
         else
             success = null != run(settings.command.getFactory(settings), settings.rate.threadCount, settings.command.count,
-                                  settings.command.duration, rateLimiter, settings.command.durationUnits, output);
+                                  settings.command.duration, rateLimiter, settings.command.durationUnits, output, false);
 
         if (success)
             output.println("END");
@@ -84,12 +84,12 @@ public class StressAction implements Runnable
     // type provided separately to support recursive call for mixed command with each command type it is performing
     private void warmup(OpDistributionFactory operations)
     {
-        // warmup - do 50k iterations; by default hotspot compiles methods after 10k invocations
         PrintStream warmupOutput = new PrintStream(new OutputStream() { @Override public void write(int b) throws IOException { } } );
-        int iterations = 50000 * settings.node.nodes.size();
+        // do 25% of iterations as warmup but no more than 50k (by default hotspot compiles methods after 10k invocations)
+        int iterations = (settings.command.count > 0
+                         ? Math.min(50000, (int)(settings.command.count * 0.25))
+                         : 50000) * settings.node.nodes.size();
         int threads = 100;
-        if (iterations > settings.command.count && settings.command.count > 0)
-            return;
 
         if (settings.rate.maxThreads > 0)
             threads = Math.min(threads, settings.rate.maxThreads);
@@ -101,7 +101,7 @@ public class StressAction implements Runnable
             // we need to warm up all the nodes in the cluster ideally, but we may not be the only stress instance;
             // so warm up all the nodes we're speaking to only.
             output.println(String.format("Warming up %s with %d iterations...", single.desc(), iterations));
-            run(single, threads, iterations, 0, null, null, warmupOutput);
+            run(single, threads, iterations, 0, null, null, warmupOutput, true);
         }
     }
 
@@ -124,7 +124,7 @@ public class StressAction implements Runnable
                 settings.command.truncateTables(settings);
 
             StressMetrics result = run(settings.command.getFactory(settings), threadCount, settings.command.count,
-                                       settings.command.duration, rateLimiter, settings.command.durationUnits, output);
+                                       settings.command.duration, rateLimiter, settings.command.durationUnits, output, false);
             if (result == null)
                 return false;
             results.add(result);
@@ -181,7 +181,14 @@ public class StressAction implements Runnable
         return improvement / count;
     }
 
-    private StressMetrics run(OpDistributionFactory operations, int threadCount, long opCount, long duration, RateLimiter rateLimiter, TimeUnit durationUnits, PrintStream output)
+    private StressMetrics run(OpDistributionFactory operations,
+                              int threadCount,
+                              long opCount,
+                              long duration,
+                              RateLimiter rateLimiter,
+                              TimeUnit durationUnits,
+                              PrintStream output,
+                              boolean isWarmup)
     {
         output.println(String.format("Running %s with %d threads %s",
                                      operations.desc(),
@@ -199,10 +206,12 @@ public class StressAction implements Runnable
 
         final CountDownLatch done = new CountDownLatch(threadCount);
         final Consumer[] consumers = new Consumer[threadCount];
+        int sampleCount = settings.samples.liveCount / threadCount;
         for (int i = 0; i < threadCount; i++)
         {
-            consumers[i] = new Consumer(operations, done, workManager, metrics, rateLimiter,
-                                        settings.samples.liveCount / threadCount);
+
+            consumers[i] = new Consumer(operations.get(metrics.getTiming(), sampleCount, isWarmup),
+                                        done, workManager, metrics, rateLimiter);
         }
 
         // starting worker threadCount
@@ -259,14 +268,17 @@ public class StressAction implements Runnable
         private final WorkManager workManager;
         private final CountDownLatch done;
 
-        public Consumer(OpDistributionFactory operations, CountDownLatch done, WorkManager workManager, StressMetrics metrics,
-                        RateLimiter rateLimiter, int sampleCount)
+        public Consumer(OpDistribution operations,
+                        CountDownLatch done,
+                        WorkManager workManager,
+                        StressMetrics metrics,
+                        RateLimiter rateLimiter)
         {
             this.done = done;
             this.rateLimiter = rateLimiter;
             this.workManager = workManager;
             this.metrics = metrics;
-            this.operations = operations.get(metrics.getTiming(), sampleCount);
+            this.operations = operations;
         }
 
         public void run()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
index 0190ed8..3585a00 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
@@ -158,7 +158,7 @@ public class StressMetrics
         TimingInterval current = result.intervals.combine(settings.samples.reportCount);
         TimingInterval history = timing.getHistory().combine(settings.samples.historyCount);
         rowRateUncertainty.update(current.adjustedRowRate());
-        if (current.partitionCount != 0)
+        if (current.operationCount != 0)
         {
             if (result.intervals.intervals().size() > 1)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
index 297a004..5243d96 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.stress.generate.*;
 import org.apache.cassandra.stress.generate.values.*;
+import org.apache.cassandra.stress.operations.userdefined.TokenRangeQuery;
 import org.apache.cassandra.stress.operations.userdefined.SchemaInsert;
 import org.apache.cassandra.stress.operations.userdefined.SchemaQuery;
 import org.apache.cassandra.stress.operations.userdefined.ValidatingSchemaQuery;
@@ -66,9 +67,11 @@ public class StressProfile implements Serializable
     public String tableName;
     private Map<String, GeneratorConfig> columnConfigs;
     private Map<String, StressYaml.QueryDef> queries;
+    public Map<String, StressYaml.TokenRangeQueryDef> tokenRangeQueries;
     private Map<String, String> insert;
 
     transient volatile TableMetadata tableMetaData;
+    transient volatile Set<TokenRange> tokenRanges;
 
     transient volatile GeneratorFactory generatorFactory;
 
@@ -92,6 +95,7 @@ public class StressProfile implements Serializable
         tableCql = yaml.table_definition;
         seedStr = "seed for stress";
         queries = yaml.queries;
+        tokenRangeQueries = yaml.token_range_queries;
         insert = yaml.insert;
 
         extraSchemaDefinitions = yaml.extra_definitions;
@@ -100,6 +104,10 @@ public class StressProfile implements Serializable
         assert tableName != null : "table name is required in yaml file";
         assert queries != null : "queries map is required in yaml file";
 
+        for (String query : queries.keySet())
+            assert !tokenRangeQueries.containsKey(query) :
+                String.format("Found %s in both queries and token_range_queries, please use different names", query);
+
         if (keyspaceCql != null && keyspaceCql.length() > 0)
         {
             try
@@ -254,8 +262,48 @@ public class StressProfile implements Serializable
         }
     }
 
-    public SchemaQuery getQuery(String name, Timer timer, PartitionGenerator generator, SeedManager seeds, StressSettings settings)
+    public Set<TokenRange> maybeLoadTokenRanges(StressSettings settings)
     {
+        maybeLoadSchemaInfo(settings); // ensure table metadata is available
+
+        JavaDriverClient client = settings.getJavaDriverClient();
+        synchronized (client)
+        {
+            if (tokenRanges != null)
+                return tokenRanges;
+
+            Cluster cluster = client.getCluster();
+            Metadata metadata = cluster.getMetadata();
+            if (metadata == null)
+                throw new RuntimeException("Unable to get metadata");
+
+            List<TokenRange> sortedRanges = new ArrayList<>(metadata.getTokenRanges().size() + 1);
+            for (TokenRange range : metadata.getTokenRanges())
+            {
+                //if we don't unwrap we miss the partitions between ring min and smallest range start value
+                if (range.isWrappedAround())
+                    sortedRanges.addAll(range.unwrap());
+                else
+                    sortedRanges.add(range);
+            }
+
+            Collections.sort(sortedRanges);
+            tokenRanges = new LinkedHashSet<>(sortedRanges);
+            return tokenRanges;
+        }
+    }
+
+    public Operation getQuery(String name,
+                              Timer timer,
+                              PartitionGenerator generator,
+                              SeedManager seeds,
+                              StressSettings settings,
+                              boolean isWarmup)
+    {
+        name = name.toLowerCase();
+        if (!queries.containsKey(name))
+            throw new IllegalArgumentException("No query defined with name " + name);
+
         if (queryStatements == null)
         {
             synchronized (this)
@@ -296,13 +344,19 @@ public class StressProfile implements Serializable
             }
         }
 
-        name = name.toLowerCase();
-        if (!queryStatements.containsKey(name))
-            throw new IllegalArgumentException("No query defined with name " + name);
         return new SchemaQuery(timer, settings, generator, seeds, thriftQueryIds.get(name), queryStatements.get(name),
                                ThriftConversion.fromThrift(settings.command.consistencyLevel), argSelects.get(name));
     }
 
+    public Operation getBulkReadQueries(String name, Timer timer, StressSettings settings, TokenRangeIterator tokenRangeIterator, boolean isWarmup)
+    {
+        StressYaml.TokenRangeQueryDef def = tokenRangeQueries.get(name);
+        if (def == null)
+            throw new IllegalArgumentException("No bulk read query defined with name " + name);
+
+        return new TokenRangeQuery(timer, settings, tableMetaData, tokenRangeIterator, def, isWarmup);
+    }
+
     public SchemaInsert getInsert(Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings)
     {
         if (insertStatement == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/StressYaml.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressYaml.java b/tools/stress/src/org/apache/cassandra/stress/StressYaml.java
index 90797b4..214e56a 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressYaml.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressYaml.java
@@ -20,6 +20,7 @@
  */
 package org.apache.cassandra.stress;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -35,6 +36,7 @@ public class StressYaml
     public List<Map<String, Object>> columnspec;
     public Map<String, QueryDef> queries;
     public Map<String, String> insert;
+    public Map<String, TokenRangeQueryDef> token_range_queries = new HashMap<>();
 
     public static class QueryDef
     {
@@ -42,4 +44,10 @@ public class StressYaml
         public String fields;
     }
 
+    public static class TokenRangeQueryDef
+    {
+        public String columns;
+        public int page_size = 5000;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/WorkManager.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/WorkManager.java b/tools/stress/src/org/apache/cassandra/stress/WorkManager.java
index c6a3eee..78d4176 100644
--- a/tools/stress/src/org/apache/cassandra/stress/WorkManager.java
+++ b/tools/stress/src/org/apache/cassandra/stress/WorkManager.java
@@ -2,7 +2,7 @@ package org.apache.cassandra.stress;
 
 import java.util.concurrent.atomic.AtomicLong;
 
-interface WorkManager
+public interface WorkManager
 {
     // -1 indicates consumer should terminate
     int takePermits(int count);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/generate/TokenRangeIterator.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/TokenRangeIterator.java b/tools/stress/src/org/apache/cassandra/stress/generate/TokenRangeIterator.java
new file mode 100644
index 0000000..5ddac61
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/TokenRangeIterator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.stress.generate;
+
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import com.datastax.driver.core.TokenRange;
+import org.apache.cassandra.stress.settings.StressSettings;
+
+public class TokenRangeIterator
+{
+    private final Set<TokenRange> tokenRanges;
+    private final ConcurrentLinkedQueue<TokenRange> pendingRanges;
+    private final boolean wrap;
+
+    public TokenRangeIterator(StressSettings settings, Set<TokenRange> tokenRanges)
+    {
+        this.tokenRanges = maybeSplitRanges(tokenRanges, settings.tokenRange.splitFactor);
+        this.pendingRanges = new ConcurrentLinkedQueue<>(this.tokenRanges);
+        this.wrap = settings.tokenRange.wrap;
+    }
+
+    private static Set<TokenRange> maybeSplitRanges(Set<TokenRange> tokenRanges, int splitFactor)
+    {
+        if (splitFactor <= 1)
+            return tokenRanges;
+
+        Set<TokenRange> ret = new TreeSet<>();
+        for (TokenRange range : tokenRanges)
+            ret.addAll(range.splitEvenly(splitFactor));
+
+        return ret;
+    }
+
+    public void update()
+    {
+        // we may race and add to the queue twice but no bad consequence so it's fine if that happens
+        // as ultimately only the permits determine when to stop if wrap is true
+        if (wrap && pendingRanges.isEmpty())
+            pendingRanges.addAll(tokenRanges);
+    }
+
+    public TokenRange next()
+    {
+        return pendingRanges.poll();
+    }
+
+    public boolean exhausted()
+    {
+        return pendingRanges.isEmpty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
index 7e13fcd..5fbb0f9 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
@@ -25,7 +25,7 @@ import org.apache.cassandra.stress.util.Timing;
 
 public interface OpDistributionFactory
 {
-    public OpDistribution get(Timing timing, int sampleCount);
+    public OpDistribution get(Timing timing, int sampleCount, boolean isWarmup);
     public String desc();
     Iterable<OpDistributionFactory> each();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java
new file mode 100644
index 0000000..45c36f2
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.stress.operations;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.WorkManager;
+import org.apache.cassandra.stress.generate.Distribution;
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.PartitionIterator;
+import org.apache.cassandra.stress.generate.RatioDistribution;
+import org.apache.cassandra.stress.generate.Seed;
+import org.apache.cassandra.stress.generate.SeedManager;
+import org.apache.cassandra.stress.settings.OptionRatioDistribution;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.Timer;
+
+public abstract class PartitionOperation extends Operation
+{
+    protected final DataSpec spec;
+    private final static RatioDistribution defaultRowPopulationRatio = OptionRatioDistribution.BUILDER.apply("fixed(1)/1").get();
+
+    private final List<PartitionIterator> partitionCache = new ArrayList<>();
+    protected List<PartitionIterator> partitions;
+
+    public static final class DataSpec
+    {
+        public final PartitionGenerator partitionGenerator;
+        final SeedManager seedManager;
+        final Distribution partitionCount;
+        final RatioDistribution useRatio;
+        final RatioDistribution rowPopulationRatio;
+        final Integer targetCount;
+
+        public DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution rowPopulationRatio, Integer targetCount)
+        {
+            this(partitionGenerator, seedManager, partitionCount, null, rowPopulationRatio, targetCount);
+        }
+        public DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution useRatio, RatioDistribution rowPopulationRatio)
+        {
+            this(partitionGenerator, seedManager, partitionCount, useRatio, rowPopulationRatio, null);
+        }
+        private DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution useRatio, RatioDistribution rowPopulationRatio, Integer targetCount)
+        {
+            this.partitionGenerator = partitionGenerator;
+            this.seedManager = seedManager;
+            this.partitionCount = partitionCount;
+            this.useRatio = useRatio;
+            this.rowPopulationRatio = rowPopulationRatio == null ? defaultRowPopulationRatio : rowPopulationRatio;
+            this.targetCount = targetCount;
+        }
+    }
+
+    public PartitionOperation(Timer timer, StressSettings settings, DataSpec spec)
+    {
+        super(timer, settings);
+        this.spec = spec;
+    }
+
+    public boolean ready(WorkManager permits, RateLimiter rateLimiter)
+    {
+        int partitionCount = (int) spec.partitionCount.next();
+        if (partitionCount <= 0)
+            return false;
+        partitionCount = permits.takePermits(partitionCount);
+        if (partitionCount <= 0)
+            return false;
+
+        int i = 0;
+        boolean success = true;
+        for (; i < partitionCount && success ; i++)
+        {
+            if (i >= partitionCache.size())
+                partitionCache.add(PartitionIterator.get(spec.partitionGenerator, spec.seedManager));
+
+            success = false;
+            while (!success)
+            {
+                Seed seed = spec.seedManager.next(this);
+                if (seed == null)
+                    break;
+
+                success = reset(seed, partitionCache.get(i));
+            }
+        }
+        partitionCount = i;
+
+        if (rateLimiter != null)
+            rateLimiter.acquire(partitionCount);
+
+        partitions = partitionCache.subList(0, partitionCount);
+        return !partitions.isEmpty();
+    }
+
+    protected boolean reset(Seed seed, PartitionIterator iterator)
+    {
+        if (spec.useRatio == null)
+            return iterator.reset(seed, spec.targetCount, spec.rowPopulationRatio.next(), isWrite());
+        else
+            return iterator.reset(seed, spec.useRatio.next(), spec.rowPopulationRatio.next(), isWrite());
+    }
+
+    public String key()
+    {
+        List<String> keys = new ArrayList<>();
+        for (PartitionIterator partition : partitions)
+            keys.add(partition.getKeyAsString());
+        return keys.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
index 194f84f..a10585d 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
@@ -44,16 +44,17 @@ public abstract class SampledOpDistributionFactory<T> implements OpDistributionF
         this.clustering = clustering;
     }
 
-    protected abstract List<? extends Operation> get(Timer timer, PartitionGenerator generator, T key);
+    protected abstract List<? extends Operation> get(Timer timer, PartitionGenerator generator, T key, boolean isWarmup);
     protected abstract PartitionGenerator newGenerator();
 
-    public OpDistribution get(Timing timing, int sampleCount)
+    public OpDistribution get(Timing timing, int sampleCount, boolean isWarmup)
     {
         PartitionGenerator generator = newGenerator();
         List<Pair<Operation, Double>> operations = new ArrayList<>();
         for (Map.Entry<T, Double> ratio : ratios.entrySet())
         {
-            List<? extends Operation> ops = get(timing.newTimer(ratio.getKey().toString(), sampleCount), generator, ratio.getKey());
+            List<? extends Operation> ops = get(timing.newTimer(ratio.getKey().toString(), sampleCount),
+                                                generator, ratio.getKey(), isWarmup);
             for (Operation op : ops)
                 operations.add(new Pair<>(op, ratio.getValue() / ops.size()));
         }
@@ -75,15 +76,18 @@ public abstract class SampledOpDistributionFactory<T> implements OpDistributionF
         {
             out.add(new OpDistributionFactory()
             {
-                public OpDistribution get(Timing timing, int sampleCount)
+                public OpDistribution get(Timing timing, int sampleCount, boolean isWarmup)
                 {
-                    List<? extends Operation> ops = SampledOpDistributionFactory.this.get(timing.newTimer(ratio.getKey().toString(), sampleCount), newGenerator(), ratio.getKey());
+                    List<? extends Operation> ops = SampledOpDistributionFactory.this.get(timing.newTimer(ratio.getKey().toString(), sampleCount),
+                                                                                          newGenerator(),
+                                                                                          ratio.getKey(),
+                                                                                          isWarmup);
                     if (ops.size() == 1)
                         return new FixedOpDistribution(ops.get(0));
                     List<Pair<Operation, Double>> ratios = new ArrayList<>();
                     for (Operation op : ops)
                         ratios.add(new Pair<>(op, 1d / ops.size()));
-                    return new SampledOpDistribution(new EnumeratedDistribution<Operation>(ratios), new DistributionFixed(1));
+                    return new SampledOpDistribution(new EnumeratedDistribution<>(ratios), new DistributionFixed(1));
                 }
 
                 public String desc()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
index b435abb..1f9a2c8 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.cassandra.stress.Operation;
 import org.apache.cassandra.stress.generate.*;
+import org.apache.cassandra.stress.operations.PartitionOperation;
 import org.apache.cassandra.stress.settings.Command;
 import org.apache.cassandra.stress.settings.CqlVersion;
 import org.apache.cassandra.stress.settings.StressSettings;
@@ -32,7 +33,7 @@ import org.apache.cassandra.stress.util.Timer;
 import org.apache.cassandra.thrift.SlicePredicate;
 import org.apache.cassandra.thrift.SliceRange;
 
-public abstract class PredefinedOperation extends Operation
+public abstract class PredefinedOperation extends PartitionOperation
 {
     public static final byte[] EMPTY_BYTE_ARRAY = {};
     public final Command type;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
index 1c88490..166d689 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
@@ -32,11 +32,12 @@ import com.datastax.driver.core.PreparedStatement;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.stress.Operation;
 import org.apache.cassandra.stress.generate.Row;
+import org.apache.cassandra.stress.operations.PartitionOperation;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.JavaDriverClient;
 import org.apache.cassandra.stress.util.Timer;
 
-public abstract class SchemaStatement extends Operation
+public abstract class SchemaStatement extends PartitionOperation
 {
 
     final PreparedStatement statement;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
new file mode 100644
index 0000000..60a6c48
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.stress.operations.userdefined;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import javax.naming.OperationNotSupportedException;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.PagingState;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.Token;
+import com.datastax.driver.core.TokenRange;
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.StressYaml;
+import org.apache.cassandra.stress.WorkManager;
+import org.apache.cassandra.stress.generate.TokenRangeIterator;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.JavaDriverClient;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.stress.util.Timer;
+
+public class TokenRangeQuery extends Operation
+{
+    private final ThreadLocal<State> currentState = new ThreadLocal<>();
+
+    private final TableMetadata tableMetadata;
+    private final TokenRangeIterator tokenRangeIterator;
+    private final String columns;
+    private final int pageSize;
+    private final boolean isWarmup;
+
+    public TokenRangeQuery(Timer timer,
+                           StressSettings settings,
+                           TableMetadata tableMetadata,
+                           TokenRangeIterator tokenRangeIterator,
+                           StressYaml.TokenRangeQueryDef def,
+                           boolean isWarmup)
+    {
+        super(timer, settings);
+        this.tableMetadata = tableMetadata;
+        this.tokenRangeIterator = tokenRangeIterator;
+        this.columns = sanitizeColumns(def.columns, tableMetadata);
+        this.pageSize = isWarmup ? Math.min(100, def.page_size) : def.page_size;
+        this.isWarmup = isWarmup;
+    }
+
+    /**
+     * We need to specify the columns by name because we need to add token(partition_keys) in order to count
+     * partitions. So if the user specifies '*' then replace it with a list of all columns.
+     */
+    private static String sanitizeColumns(String columns, TableMetadata tableMetadata)
+    {
+        if (!columns.equals("*"))
+            return columns;
+
+        return String.join(", ", tableMetadata.getColumns().stream().map(ColumnMetadata::getName).collect(Collectors.toList()));
+    }
+
+    /**
+     * The state of a token range currently being retrieved.
+     * Here we store the paging state to retrieve more pages
+     * and we keep track of which partitions have already been retrieved,
+     */
+    private final static class State
+    {
+        public final TokenRange tokenRange;
+        public final String query;
+        public PagingState pagingState;
+        public Set<Token> partitions = new HashSet<>();
+
+        public State(TokenRange tokenRange, String query)
+        {
+            this.tokenRange = tokenRange;
+            this.query = query;
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("[%s, %s]", tokenRange.getStart(), tokenRange.getEnd());
+        }
+    }
+
+    abstract static class Runner implements RunOp
+    {
+        int partitionCount;
+        int rowCount;
+
+        @Override
+        public int partitionCount()
+        {
+            return partitionCount;
+        }
+
+        @Override
+        public int rowCount()
+        {
+            return rowCount;
+        }
+    }
+
+    private class JavaDriverRun extends Runner
+    {
+        final JavaDriverClient client;
+
+        private JavaDriverRun(JavaDriverClient client)
+        {
+            this.client = client;
+        }
+
+        public boolean run() throws Exception
+        {
+            State state = currentState.get();
+            if (state == null)
+            { // start processing a new token range
+                TokenRange range = tokenRangeIterator.next();
+                if (range == null)
+                    return true; // no more token ranges to process
+
+                state = new State(range, buildQuery(range));
+                currentState.set(state);
+            }
+
+            ResultSet results;
+            Statement statement = new SimpleStatement(state.query);
+            statement.setFetchSize(pageSize);
+
+            if (state.pagingState != null)
+                statement.setPagingState(state.pagingState);
+
+            results = client.getSession().execute(statement);
+            state.pagingState = results.getExecutionInfo().getPagingState();
+
+            int remaining = results.getAvailableWithoutFetching();
+            rowCount += remaining;
+
+            for (Row row : results)
+            {
+                // this call will only succeed if we've added token(partition keys) to the query
+                Token partition = row.getPartitionKeyToken();
+                if (!state.partitions.contains(partition))
+                {
+                    partitionCount += 1;
+                    state.partitions.add(partition);
+                }
+
+                if (--remaining == 0)
+                    break;
+            }
+
+            if (results.isExhausted() || isWarmup)
+            { // no more pages to fetch or just warming up, ready to move on to another token range
+                currentState.set(null);
+            }
+
+            return true;
+        }
+    }
+
+    private String buildQuery(TokenRange tokenRange)
+    {
+        Token start = tokenRange.getStart();
+        Token end = tokenRange.getEnd();
+        List<String> pkColumns = tableMetadata.getPartitionKey().stream().map(ColumnMetadata::getName).collect(Collectors.toList());
+        String tokenStatement = String.format("token(%s)", String.join(", ", pkColumns));
+
+        StringBuilder ret = new StringBuilder();
+        ret.append("SELECT ");
+        ret.append(tokenStatement); // add the token(pk) statement so that we can count partitions
+        ret.append(", ");
+        ret.append(columns);
+        ret.append(" FROM ");
+        ret.append(tableMetadata.getName());
+        if (start != null || end != null)
+            ret.append(" WHERE ");
+        if (start != null)
+        {
+            ret.append(tokenStatement);
+            ret.append(" > ");
+            ret.append(start.toString());
+        }
+
+        if (start != null && end != null)
+            ret.append(" AND ");
+
+        if (end != null)
+        {
+            ret.append(tokenStatement);
+            ret.append(" <= ");
+            ret.append(end.toString());
+        }
+
+        return ret.toString();
+    }
+
+    private static class ThriftRun extends Runner
+    {
+        final ThriftClient client;
+
+        private ThriftRun(ThriftClient client)
+        {
+            this.client = client;
+        }
+
+        public boolean run() throws Exception
+        {
+            throw new OperationNotSupportedException("Bulk read over thrift not supported");
+        }
+    }
+
+
+    @Override
+    public void run(JavaDriverClient client) throws IOException
+    {
+        timeWithRetry(new JavaDriverRun(client));
+    }
+
+    @Override
+    public void run(ThriftClient client) throws IOException
+    {
+        timeWithRetry(new ThriftRun(client));
+    }
+
+    public boolean ready(WorkManager workManager, RateLimiter rateLimiter)
+    {
+        tokenRangeIterator.update();
+
+        if (tokenRangeIterator.exhausted() && currentState.get() == null)
+            return false;
+
+        int numLeft = workManager.takePermits(1);
+        if (rateLimiter != null && numLeft > 0 )
+            rateLimiter.acquire(numLeft);
+
+        return numLeft > 0;
+    }
+
+    public String key()
+    {
+        State state = currentState.get();
+        return state == null ? "-" : state.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
index c07328a..33f6f80 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.stress.Operation;
 import org.apache.cassandra.stress.generate.*;
 import org.apache.cassandra.stress.generate.Row;
+import org.apache.cassandra.stress.operations.PartitionOperation;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.JavaDriverClient;
 import org.apache.cassandra.stress.util.ThriftClient;
@@ -44,7 +45,7 @@ import org.apache.cassandra.thrift.ThriftConversion;
 import org.apache.cassandra.utils.Pair;
 import org.apache.thrift.TException;
 
-public class ValidatingSchemaQuery extends Operation
+public class ValidatingSchemaQuery extends PartitionOperation
 {
     private Pair<Row, Row> bounds;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/settings/CliOption.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/CliOption.java b/tools/stress/src/org/apache/cassandra/stress/settings/CliOption.java
index 30933d9..6d8e184 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/CliOption.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/CliOption.java
@@ -39,7 +39,8 @@ public enum CliOption
     TRANSPORT("Custom transport factories", SettingsTransport.helpPrinter()),
     PORT("The port to connect to cassandra nodes on", SettingsPort.helpPrinter()),
     SENDTO("-send-to", "Specify a stress server to send this command to", SettingsMisc.sendToDaemonHelpPrinter()),
-    GRAPH("-graph", "Graph recorded metrics", SettingsGraph.helpPrinter())
+    GRAPH("-graph", "Graph recorded metrics", SettingsGraph.helpPrinter()),
+    TOKENRANGE("Token range settings", SettingsTokenRange.helpPrinter())
     ;
 
     private static final Map<String, CliOption> LOOKUP;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java
index 83f444c..c2f2591 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java
@@ -51,7 +51,7 @@ public class SettingsCommandPreDefined extends SettingsCommand
         final SeedManager seeds = new SeedManager(settings);
         return new OpDistributionFactory()
         {
-            public OpDistribution get(Timing timing, int sampleCount)
+            public OpDistribution get(Timing timing, int sampleCount, boolean isWarmup)
             {
                 return new FixedOpDistribution(PredefinedOperation.operation(type, timing.newTimer(type.toString(), sampleCount),
                                                newGenerator(settings), seeds, settings, add));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java
index 861b1a4..dd11452 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java
@@ -55,7 +55,7 @@ public class SettingsCommandPreDefinedMixed extends SettingsCommandPreDefined
         final SeedManager seeds = new SeedManager(settings);
         return new SampledOpDistributionFactory<Command>(ratios, clustering)
         {
-            protected List<? extends Operation> get(Timer timer, PartitionGenerator generator, Command key)
+            protected List<? extends Operation> get(Timer timer, PartitionGenerator generator, Command key, boolean isWarmup)
             {
                 return Collections.singletonList(PredefinedOperation.operation(key, timer, generator, seeds, settings, add));
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
index 8440e8e..36cbefe 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.stress.StressProfile;
 import org.apache.cassandra.stress.generate.DistributionFactory;
 import org.apache.cassandra.stress.generate.PartitionGenerator;
 import org.apache.cassandra.stress.generate.SeedManager;
+import org.apache.cassandra.stress.generate.TokenRangeIterator;
 import org.apache.cassandra.stress.operations.OpDistributionFactory;
 import org.apache.cassandra.stress.operations.SampledOpDistributionFactory;
 import org.apache.cassandra.stress.util.Timer;
@@ -69,15 +70,24 @@ public class SettingsCommandUser extends SettingsCommand
     public OpDistributionFactory getFactory(final StressSettings settings)
     {
         final SeedManager seeds = new SeedManager(settings);
+        final TokenRangeIterator tokenRangeIterator = profile.tokenRangeQueries.isEmpty()
+                                                      ? null
+                                                      : new TokenRangeIterator(settings,
+                                                                               profile.maybeLoadTokenRanges(settings));
+
         return new SampledOpDistributionFactory<String>(ratios, clustering)
         {
-            protected List<? extends Operation> get(Timer timer, PartitionGenerator generator, String key)
+            protected List<? extends Operation> get(Timer timer, PartitionGenerator generator, String key, boolean isWarmup)
             {
                 if (key.equalsIgnoreCase("insert"))
                     return Collections.singletonList(profile.getInsert(timer, generator, seeds, settings));
                 if (key.equalsIgnoreCase("validate"))
                     return profile.getValidate(timer, generator, seeds, settings);
-                return Collections.singletonList(profile.getQuery(key, timer, generator, seeds, settings));
+
+                if (profile.tokenRangeQueries.containsKey(key))
+                    return Collections.singletonList(profile.getBulkReadQueries(key, timer, settings, tokenRangeIterator, isWarmup));
+
+                return Collections.singletonList(profile.getQuery(key, timer, generator, seeds, settings, isWarmup));
             }
 
             protected PartitionGenerator newGenerator()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTokenRange.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTokenRange.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTokenRange.java
new file mode 100644
index 0000000..8fb0048
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTokenRange.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.stress.settings;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.Ints;
+
+public class SettingsTokenRange
+{
+    public final boolean wrap;
+    public final int splitFactor;
+
+    public SettingsTokenRange(TokenRangeOptions options)
+    {
+        this.wrap = options.wrap.setByUser();
+        this.splitFactor = Ints.checkedCast(OptionDistribution.parseLong(options.splitFactor.value()));
+    }
+
+    private static final class TokenRangeOptions extends GroupedOptions
+    {
+        final OptionSimple wrap = new OptionSimple("wrap", "", null, "Re-use token ranges in order to terminate stress iterations", false);
+        final OptionSimple splitFactor = new OptionSimple("split-factor=", "[0-9]+[bmk]?", "1", "Split every token range by this factor", false);
+
+
+        @Override
+        public List<? extends Option> options()
+        {
+            return ImmutableList.<Option>builder().add(wrap, splitFactor).build();
+        }
+    }
+
+    public static SettingsTokenRange get(Map<String, String[]> clArgs)
+    {
+        String[] params = clArgs.remove("-tokenrange");
+        if (params == null)
+        {
+            return new SettingsTokenRange(new TokenRangeOptions());
+        }
+        TokenRangeOptions options = GroupedOptions.select(params, new TokenRangeOptions());
+        if (options == null)
+        {
+            printHelp();
+            System.out.println("Invalid -tokenrange options provided, see output for valid options");
+            System.exit(1);
+        }
+        return new SettingsTokenRange(options);
+    }
+
+    public static void printHelp()
+    {
+        GroupedOptions.printOptions(System.out, "-tokenrange", new TokenRangeOptions());
+    }
+
+    public static Runnable helpPrinter()
+    {
+        return SettingsTokenRange::printHelp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
index 26f65b5..6625bc8 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
@@ -55,6 +55,7 @@ public class StressSettings implements Serializable
     public final SettingsPort port;
     public final String sendToDaemon;
     public final SettingsGraph graph;
+    public final SettingsTokenRange tokenRange;
 
     public StressSettings(SettingsCommand command,
                           SettingsRate rate,
@@ -70,7 +71,8 @@ public class StressSettings implements Serializable
                           SettingsTransport transport,
                           SettingsPort port,
                           String sendToDaemon,
-                          SettingsGraph graph)
+                          SettingsGraph graph,
+                          SettingsTokenRange tokenRange)
     {
         this.command = command;
         this.rate = rate;
@@ -87,6 +89,7 @@ public class StressSettings implements Serializable
         this.port = port;
         this.sendToDaemon = sendToDaemon;
         this.graph = graph;
+        this.tokenRange = tokenRange;
     }
 
     private SmartThriftClient tclient;
@@ -269,6 +272,7 @@ public class StressSettings implements Serializable
         SettingsPort port = SettingsPort.get(clArgs);
         SettingsRate rate = SettingsRate.get(clArgs, command);
         SettingsPopulation generate = SettingsPopulation.get(clArgs, command);
+        SettingsTokenRange tokenRange = SettingsTokenRange.get(clArgs);
         SettingsInsert insert = SettingsInsert.get(clArgs);
         SettingsColumn columns = SettingsColumn.get(clArgs);
         SettingsSamples samples = SettingsSamples.get(clArgs);
@@ -296,7 +300,7 @@ public class StressSettings implements Serializable
             System.exit(1);
         }
 
-        return new StressSettings(command, rate, generate, insert, columns, samples, errors, log, mode, node, schema, transport, port, sendToDaemon, graph);
+        return new StressSettings(command, rate, generate, insert, columns, samples, errors, log, mode, node, schema, transport, port, sendToDaemon, graph, tokenRange);
     }
 
     private static Map<String, String[]> parseMap(String[] args)