You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/09/07 16:38:50 UTC

[07/15] Improve stress workload realism

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/SeedManager.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/SeedManager.java b/tools/stress/src/org/apache/cassandra/stress/generate/SeedManager.java
new file mode 100644
index 0000000..dba721d
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/SeedManager.java
@@ -0,0 +1,249 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.stress.generate;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.DynamicList;
+
+public class SeedManager
+{
+
+    final Distribution visits;
+    final Generator writes;
+    final Generator reads;
+    final ConcurrentHashMap<Seed, Seed> managing = new ConcurrentHashMap<>();
+    final DynamicList<Seed> sampleFrom;
+    final Distribution sample;
+
+    public SeedManager(StressSettings settings)
+    {
+        Generator writes, reads;
+        if (settings.generate.sequence != null)
+        {
+            long[] seq = settings.generate.sequence;
+            if (settings.generate.readlookback != null)
+            {
+                LookbackableWriteGenerator series = new LookbackableWriteGenerator(seq[0], seq[1], settings.generate.wrap, settings.generate.readlookback.get());
+                writes = series;
+                reads = series.reads;
+            }
+            else
+            {
+                writes = reads = new SeriesGenerator(seq[0], seq[1], settings.generate.wrap);
+            }
+        }
+        else
+        {
+            writes = reads = new RandomGenerator(settings.generate.distribution.get());
+        }
+        this.visits = settings.insert.visits.get();
+        this.writes = writes;
+        this.reads = reads;
+        this.sample = DistributionInverted.invert(settings.insert.revisit.get());
+        if (sample.maxValue() > Integer.MAX_VALUE || sample.minValue() < 0)
+            throw new IllegalArgumentException();
+        this.sampleFrom = new DynamicList<>((int) sample.maxValue());
+    }
+
+    public Seed next(Operation op)
+    {
+        if (!op.isWrite())
+        {
+            Seed seed = reads.next(-1);
+            if (seed == null)
+                return null;
+            Seed managing = this.managing.get(seed);
+            return managing == null ? seed : managing;
+        }
+
+        while (true)
+        {
+            int index = (int) sample.next();
+            Seed seed = sampleFrom.get(index);
+            if (seed != null && seed.take())
+                return seed;
+
+            seed = writes.next((int) visits.next());
+            if (seed == null)
+                return null;
+            // seeds are created HELD, so if we insert it successfully we have it exclusively for our write
+            if (managing.putIfAbsent(seed, seed) == null)
+                return seed;
+        }
+    }
+
+    public void markVisited(Seed seed, int[] position)
+    {
+        boolean first = seed.position == null;
+        seed.position = position;
+        finishedWriting(seed, first, false);
+    }
+
+    public void markFinished(Seed seed)
+    {
+        finishedWriting(seed, seed.position == null, true);
+    }
+
+    void finishedWriting(Seed seed, boolean first, boolean completed)
+    {
+        if (!completed)
+        {
+            if (first)
+                seed.poolNode = sampleFrom.append(seed);
+            seed.yield();
+        }
+        else
+        {
+            if (!first)
+                sampleFrom.remove(seed.poolNode);
+            managing.remove(seed);
+        }
+        if (first)
+            writes.finishWrite(seed);
+    }
+
+    private abstract class Generator
+    {
+        abstract Seed next(int visits);
+        void finishWrite(Seed seed) { }
+    }
+
+    private class RandomGenerator extends Generator
+    {
+
+        final Distribution distribution;
+
+        public RandomGenerator(Distribution distribution)
+        {
+            this.distribution = distribution;
+        }
+
+        public Seed next(int visits)
+        {
+            return new Seed(distribution.next(), visits);
+        }
+    }
+
+    private class SeriesGenerator extends Generator
+    {
+
+        final long start;
+        final long totalCount;
+        final boolean wrap;
+        final AtomicLong next = new AtomicLong();
+
+        public SeriesGenerator(long start, long end, boolean wrap)
+        {
+            this.wrap = wrap;
+            if (start > end)
+                throw new IllegalStateException();
+            this.start = start;
+            this.totalCount = 1 + end - start;
+        }
+
+        public Seed next(int visits)
+        {
+            long next = this.next.getAndIncrement();
+            if (!wrap && next >= totalCount)
+                return null;
+            return new Seed(start + (next % totalCount), visits);
+        }
+    }
+
+    private class LookbackableWriteGenerator extends SeriesGenerator
+    {
+
+        final AtomicLong writeCount = new AtomicLong();
+        final ConcurrentSkipListMap<Seed, Seed> afterMin = new ConcurrentSkipListMap<>();
+        final LookbackReadGenerator reads;
+
+        public LookbackableWriteGenerator(long start, long end, boolean wrap, Distribution readLookback)
+        {
+            super(start, end, wrap);
+            this.writeCount.set(0);
+            reads = new LookbackReadGenerator(readLookback);
+        }
+
+        public Seed next(int visits)
+        {
+            long next = this.next.getAndIncrement();
+            if (!wrap && next >= totalCount)
+                return null;
+            return new Seed(start + (next % totalCount), visits);
+        }
+
+        void finishWrite(Seed seed)
+        {
+            if (seed.seed <= writeCount.get())
+                return;
+            afterMin.put(seed, seed);
+            while (true)
+            {
+                Map.Entry<Seed, Seed> head = afterMin.firstEntry();
+                if (head == null)
+                    return;
+                long min = this.writeCount.get();
+                if (head.getKey().seed <= min)
+                    return;
+                if (head.getKey().seed == min + 1 && this.writeCount.compareAndSet(min, min + 1))
+                {
+                    afterMin.remove(head.getKey());
+                    continue;
+                }
+                return;
+            }
+        }
+
+        private class LookbackReadGenerator extends Generator
+        {
+
+            final Distribution lookback;
+
+            public LookbackReadGenerator(Distribution lookback)
+            {
+                this.lookback = lookback;
+                if (lookback.maxValue() > start + totalCount)
+                    throw new IllegalArgumentException("Invalid lookback distribution; max value is " + lookback.maxValue()
+                                                       + ", but series only ranges from " + writeCount + " to " + (start + totalCount));
+            }
+
+            public Seed next(int visits)
+            {
+                long lookback = this.lookback.next();
+                long range = writeCount.get();
+                long startOffset = range - lookback;
+                if (startOffset < 0)
+                {
+                    if (range == totalCount && !wrap)
+                        return null;
+                    startOffset = range == 0 ? 0 : lookback % range;
+                }
+                return new Seed(start + startOffset, visits);
+            }
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/SeedRandomGenerator.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/SeedRandomGenerator.java b/tools/stress/src/org/apache/cassandra/stress/generate/SeedRandomGenerator.java
deleted file mode 100644
index b590ffa..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/generate/SeedRandomGenerator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package org.apache.cassandra.stress.generate;
-/*
- * 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * 
- */
-
-
-public class SeedRandomGenerator implements SeedGenerator
-{
-
-    final Distribution distribution;
-    final Distribution clustering;
-
-    private long next;
-    private int count;
-
-    public SeedRandomGenerator(Distribution distribution, Distribution clustering)
-    {
-        this.distribution = distribution;
-        this.clustering = clustering;
-    }
-
-    public long next(long workIndex)
-    {
-        if (count == 0)
-        {
-            next = distribution.next();
-            count = (int) clustering.next();
-        }
-        long result = next;
-        count--;
-        if (next == distribution.maxValue())
-            next = distribution.minValue();
-        else
-            next++;
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/SeedSeriesGenerator.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/SeedSeriesGenerator.java b/tools/stress/src/org/apache/cassandra/stress/generate/SeedSeriesGenerator.java
deleted file mode 100644
index 78a8784..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/generate/SeedSeriesGenerator.java
+++ /dev/null
@@ -1,42 +0,0 @@
-package org.apache.cassandra.stress.generate;
-/*
- * 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * 
- */
-
-
-public class SeedSeriesGenerator implements SeedGenerator
-{
-
-    final long min;
-    final long count;
-
-    public SeedSeriesGenerator(long min, long max)
-    {
-        if (min > max)
-            throw new IllegalStateException();
-        this.min = min;
-        this.count = 1 + max - min;
-    }
-
-    public long next(long workIndex)
-    {
-        return min + (workIndex % count);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/values/Booleans.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/values/Booleans.java b/tools/stress/src/org/apache/cassandra/stress/generate/values/Booleans.java
index b1d84d6..21525af 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/values/Booleans.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/values/Booleans.java
@@ -26,7 +26,7 @@ public class Booleans extends Generator<Boolean>
 {
     public Booleans(String name, GeneratorConfig config)
     {
-        super(BooleanType.instance, config, name);
+        super(BooleanType.instance, config, name, Boolean.class);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/values/Bytes.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/values/Bytes.java b/tools/stress/src/org/apache/cassandra/stress/generate/values/Bytes.java
index 2a5bddf..358163c 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/values/Bytes.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/values/Bytes.java
@@ -21,6 +21,7 @@
 package org.apache.cassandra.stress.generate.values;
 
 import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.stress.generate.FasterRandom;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -29,11 +30,11 @@ import java.util.Random;
 public class Bytes extends Generator<ByteBuffer>
 {
     private final byte[] bytes;
-    private final Random rand = new Random();
+    private final FasterRandom rand = new FasterRandom();
 
     public Bytes(String name, GeneratorConfig config)
     {
-        super(BytesType.instance, config, name);
+        super(BytesType.instance, config, name, ByteBuffer.class);
         bytes = new byte[(int) sizeDistribution.maxValue()];
     }
 
@@ -45,8 +46,8 @@ public class Bytes extends Generator<ByteBuffer>
         rand.setSeed(~seed);
         int size = (int) sizeDistribution.next();
         for (int i = 0; i < size; )
-            for (int v = rand.nextInt(),
-                 n = Math.min(size - i, Integer.SIZE/Byte.SIZE);
+            for (long v = rand.nextLong(),
+                 n = Math.min(size - i, Long.SIZE/Byte.SIZE);
                  n-- > 0; v >>= Byte.SIZE)
                 bytes[i++] = (byte)v;
         return ByteBuffer.wrap(Arrays.copyOf(bytes, size));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/values/Dates.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/values/Dates.java b/tools/stress/src/org/apache/cassandra/stress/generate/values/Dates.java
index 7d36be2..7350f57 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/values/Dates.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/values/Dates.java
@@ -30,9 +30,10 @@ public class Dates extends Generator<Date>
 {
     public Dates(String name, GeneratorConfig config)
     {
-        super(DateType.instance, config, name);
+        super(DateType.instance, config, name, Date.class);
     }
 
+    // TODO: let the range of values generated advance as stress test progresses
     @Override
     public Date generate()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/values/Doubles.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/values/Doubles.java b/tools/stress/src/org/apache/cassandra/stress/generate/values/Doubles.java
index 76e983d..0f04eb6 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/values/Doubles.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/values/Doubles.java
@@ -26,7 +26,7 @@ public class Doubles extends Generator<Double>
 {
     public Doubles(String name, GeneratorConfig config)
     {
-        super(DoubleType.instance, config, name);
+        super(DoubleType.instance, config, name, Double.class);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/values/Floats.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/values/Floats.java b/tools/stress/src/org/apache/cassandra/stress/generate/values/Floats.java
index 8e23c11..19f449a 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/values/Floats.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/values/Floats.java
@@ -26,7 +26,7 @@ public class Floats extends Generator<Float>
 {
     public Floats(String name, GeneratorConfig config)
     {
-        super(FloatType.instance, config, name);
+        super(FloatType.instance, config, name, Float.class);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/values/Generator.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/values/Generator.java b/tools/stress/src/org/apache/cassandra/stress/generate/values/Generator.java
index 1040bb3..00f866a 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/values/Generator.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/values/Generator.java
@@ -31,15 +31,17 @@ public abstract class Generator<T>
 
     public final String name;
     public final AbstractType<T> type;
+    public final Class<T> clazz;
     final long salt;
     final Distribution identityDistribution;
     final Distribution sizeDistribution;
     public final Distribution clusteringDistribution;
 
-    public Generator(AbstractType<T> type, GeneratorConfig config, String name)
+    public Generator(AbstractType<T> type, GeneratorConfig config, String name, Class<T> clazz)
     {
         this.type = type;
         this.name = name;
+        this.clazz = clazz;
         this.salt = config.salt;
         this.identityDistribution = config.getIdentityDistribution(defaultIdentityDistribution());
         this.sizeDistribution = config.getSizeDistribution(defaultSizeDistribution());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/values/HexBytes.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/values/HexBytes.java b/tools/stress/src/org/apache/cassandra/stress/generate/values/HexBytes.java
index db46bac..19f2cc3 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/values/HexBytes.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/values/HexBytes.java
@@ -31,7 +31,7 @@ public class HexBytes extends Generator<ByteBuffer>
 
     public HexBytes(String name, GeneratorConfig config)
     {
-        super(BytesType.instance, config, name);
+        super(BytesType.instance, config, name, ByteBuffer.class);
         bytes = new byte[(int) sizeDistribution.maxValue()];
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/values/HexStrings.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/values/HexStrings.java b/tools/stress/src/org/apache/cassandra/stress/generate/values/HexStrings.java
index ce65b8a..c811a61 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/values/HexStrings.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/values/HexStrings.java
@@ -20,8 +20,6 @@
  */
 package org.apache.cassandra.stress.generate.values;
 
-import java.util.Random;
-
 import org.apache.cassandra.db.marshal.UTF8Type;
 
 public class HexStrings extends Generator<String>
@@ -30,7 +28,7 @@ public class HexStrings extends Generator<String>
 
     public HexStrings(String name, GeneratorConfig config)
     {
-        super(UTF8Type.instance, config, name);
+        super(UTF8Type.instance, config, name, String.class);
         chars = new char[(int) sizeDistribution.maxValue()];
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/values/Inets.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/values/Inets.java b/tools/stress/src/org/apache/cassandra/stress/generate/values/Inets.java
index 334d73c..107daad 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/values/Inets.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/values/Inets.java
@@ -31,7 +31,7 @@ public class Inets extends Generator<InetAddress>
     final byte[] buf;
     public Inets(String name, GeneratorConfig config)
     {
-        super(InetAddressType.instance, config, name);
+        super(InetAddressType.instance, config, name, InetAddress.class);
         buf = new byte[4];
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/values/Integers.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/values/Integers.java b/tools/stress/src/org/apache/cassandra/stress/generate/values/Integers.java
index 8b9b33a..e05c615 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/values/Integers.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/values/Integers.java
@@ -27,7 +27,7 @@ public class Integers extends Generator<Integer>
 
     public Integers(String name, GeneratorConfig config)
     {
-        super(Int32Type.instance, config, name);
+        super(Int32Type.instance, config, name, Integer.class);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/values/Lists.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/values/Lists.java b/tools/stress/src/org/apache/cassandra/stress/generate/values/Lists.java
index d188f7e..6480d7a 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/values/Lists.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/values/Lists.java
@@ -33,7 +33,7 @@ public class Lists extends Generator<List>
 
     public Lists(String name, Generator valueType, GeneratorConfig config)
     {
-        super(ListType.getInstance(valueType.type), config, name);
+        super(ListType.getInstance(valueType.type), config, name, List.class);
         this.valueType = valueType;
         buffer = new Object[(int) sizeDistribution.maxValue()];
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/values/Longs.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/values/Longs.java b/tools/stress/src/org/apache/cassandra/stress/generate/values/Longs.java
index 0584ed1..638ecd0 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/values/Longs.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/values/Longs.java
@@ -26,7 +26,7 @@ public class Longs extends Generator<Long>
 {
     public Longs(String name, GeneratorConfig config)
     {
-        super(LongType.instance, config, name);
+        super(LongType.instance, config, name, Long.class);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/values/Sets.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/values/Sets.java b/tools/stress/src/org/apache/cassandra/stress/generate/values/Sets.java
index 48bf293..8246286 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/values/Sets.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/values/Sets.java
@@ -32,7 +32,7 @@ public class Sets extends Generator<Set>
 
     public Sets(String name, Generator valueType, GeneratorConfig config)
     {
-        super(SetType.getInstance(valueType.type), config, name);
+        super(SetType.getInstance(valueType.type), config, name, Set.class);
         this.valueType = valueType;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/values/Strings.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/values/Strings.java b/tools/stress/src/org/apache/cassandra/stress/generate/values/Strings.java
index e01ff20..71aaae6 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/values/Strings.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/values/Strings.java
@@ -23,15 +23,16 @@ package org.apache.cassandra.stress.generate.values;
 import java.util.Random;
 
 import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.stress.generate.FasterRandom;
 
 public class Strings extends Generator<String>
 {
     private final char[] chars;
-    private final Random rnd = new Random();
+    private final FasterRandom rnd = new FasterRandom();
 
     public Strings(String name, GeneratorConfig config)
     {
-        super(UTF8Type.instance, config, name);
+        super(UTF8Type.instance, config, name, String.class);
         chars = new char[(int) sizeDistribution.maxValue()];
     }
 
@@ -42,8 +43,11 @@ public class Strings extends Generator<String>
         sizeDistribution.setSeed(seed);
         rnd.setSeed(~seed);
         int size = (int) sizeDistribution.next();
-        for (int i = 0 ; i < size ; i++)
-            chars[i] = (char) (32 +rnd.nextInt(128-32));
+        for (int i = 0; i < size; )
+            for (long v = rnd.nextLong(),
+                 n = Math.min(size - i, Long.SIZE/Byte.SIZE);
+                 n-- > 0; v >>= Byte.SIZE)
+                chars[i++] = (char) (((v & 127) + 32) & 127);
         return new String(chars, 0, size);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/values/TimeUUIDs.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/values/TimeUUIDs.java b/tools/stress/src/org/apache/cassandra/stress/generate/values/TimeUUIDs.java
index 714959d..efe4b79 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/values/TimeUUIDs.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/values/TimeUUIDs.java
@@ -33,7 +33,7 @@ public class TimeUUIDs extends Generator<UUID>
 
     public TimeUUIDs(String name, GeneratorConfig config)
     {
-        super(TimeUUIDType.instance, config, name);
+        super(TimeUUIDType.instance, config, name, UUID.class);
         dateGen = new Dates(name, config);
         clockSeqAndNode = config.salt;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/values/UUIDs.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/values/UUIDs.java b/tools/stress/src/org/apache/cassandra/stress/generate/values/UUIDs.java
index e8d6501..faa58c6 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/values/UUIDs.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/values/UUIDs.java
@@ -28,7 +28,7 @@ public class UUIDs extends Generator<UUID>
 {
     public UUIDs(String name, GeneratorConfig config)
     {
-        super(UUIDType.instance, config, name);
+        super(UUIDType.instance, config, name, UUID.class);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java
index f794e75..b7d1ee7 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java
@@ -81,4 +81,9 @@ public class CqlCounterAdder extends CqlOperation<Integer>
     {
         return new CqlRunOpAlwaysSucceed(client, query, queryId, params, key, 1);
     }
+
+    public boolean isWrite()
+    {
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java
index c422f2b..622eb14 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java
@@ -76,4 +76,9 @@ public class CqlInserter extends CqlOperation<Integer>
     {
         return new CqlRunOpAlwaysSucceed(client, query, queryId, params, key, 1);
     }
+
+    public boolean isWrite()
+    {
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
index 7f6412b..dba2e51 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
@@ -174,7 +174,7 @@ public abstract class PredefinedOperation extends Operation
 
     protected List<ByteBuffer> getColumnValues(ColumnSelection columns)
     {
-        Row row = partitions.get(0).iterator(1).batch(1f).iterator().next();
+        Row row = partitions.get(0).iterator(1, false).next().iterator().next();
         ByteBuffer[] r = new ByteBuffer[columns.count()];
         int c = 0;
         if (columns.indices != null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java
index ee766c3..4ee42e9 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java
@@ -43,6 +43,11 @@ public class ThriftCounterAdder extends PredefinedOperation
         this.counteradd = counteradd.get();
     }
 
+    public boolean isWrite()
+    {
+        return true;
+    }
+
     public void run(final ThriftClient client) throws IOException
     {
         List<CounterColumn> columns = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java
index 5c2acfe..2500c2e 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java
@@ -42,6 +42,11 @@ public final class ThriftInserter extends PredefinedOperation
         super(Command.WRITE, timer, generator, settings);
     }
 
+    public boolean isWrite()
+    {
+        return true;
+    }
+
     public void run(final ThriftClient client) throws IOException
     {
         final ByteBuffer key = getKey();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
index 673dafe..ffa965f 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
@@ -45,15 +45,13 @@ public class SchemaInsert extends SchemaStatement
 {
 
     private final BatchStatement.Type batchType;
-    private final RatioDistribution perVisit;
-    private final RatioDistribution perBatch;
+    private final RatioDistribution selectChance;
 
-    public SchemaInsert(Timer timer, PartitionGenerator generator, StressSettings settings, Distribution partitionCount, RatioDistribution perVisit, RatioDistribution perBatch, Integer thriftId, PreparedStatement statement, ConsistencyLevel cl, BatchStatement.Type batchType)
+    public SchemaInsert(Timer timer, PartitionGenerator generator, StressSettings settings, Distribution batchSize, RatioDistribution selectChance, Integer thriftId, PreparedStatement statement, ConsistencyLevel cl, BatchStatement.Type batchType)
     {
-        super(timer, generator, settings, partitionCount, statement, thriftId, cl, ValidationType.NOT_FAIL);
+        super(timer, generator, settings, batchSize, statement, thriftId, cl, ValidationType.NOT_FAIL);
         this.batchType = batchType;
-        this.perVisit = perVisit;
-        this.perBatch = perBatch;
+        this.selectChance = selectChance;
     }
 
     private class JavaDriverRun extends Runner
@@ -69,43 +67,42 @@ public class SchemaInsert extends SchemaStatement
         {
             Partition.RowIterator[] iterators = new Partition.RowIterator[partitions.size()];
             for (int i = 0 ; i < iterators.length ; i++)
-                iterators[i] = partitions.get(i).iterator(perVisit.next());
+                iterators[i] = partitions.get(i).iterator(selectChance.next(), true);
             List<BoundStatement> stmts = new ArrayList<>();
             partitionCount = partitions.size();
 
-            boolean done;
-            do
+            for (Partition.RowIterator iterator : iterators)
             {
-                done = true;
-                stmts.clear();
-                for (Partition.RowIterator iterator : iterators)
-                {
-                    if (iterator.done())
-                        continue;
-
-                    for (Row row : iterator.batch(perBatch.next()))
-                        stmts.add(bindRow(row));
-
-                    done &= iterator.done();
-                }
+                if (iterator.done())
+                    continue;
 
-                rowCount += stmts.size();
+                for (Row row : iterator.next())
+                    stmts.add(bindRow(row));
+            }
+            rowCount += stmts.size();
 
+            // 65535 is max number of stmts per batch, so if we have more, we need to manually batch them
+            for (int j = 0 ; j < stmts.size() ; j += 65535)
+            {
+                List<BoundStatement> substmts = stmts.subList(j, Math.min(stmts.size(), j + 65535));
                 Statement stmt;
                 if (stmts.size() == 1)
                 {
-                    stmt = stmts.get(0);
+                    stmt = substmts.get(0);
                 }
                 else
                 {
                     BatchStatement batch = new BatchStatement(batchType);
                     batch.setConsistencyLevel(JavaDriverClient.from(cl));
-                    batch.addAll(stmts);
+                    batch.addAll(substmts);
                     stmt = batch;
                 }
+
                 validate(client.getSession().execute(stmt));
+            }
 
-            } while (!done);
+            for (Partition.RowIterator iterator : iterators)
+                iterator.markWriteFinished();
 
             return true;
         }
@@ -124,27 +121,23 @@ public class SchemaInsert extends SchemaStatement
         {
             Partition.RowIterator[] iterators = new Partition.RowIterator[partitions.size()];
             for (int i = 0 ; i < iterators.length ; i++)
-                iterators[i] = partitions.get(i).iterator(perVisit.next());
+                iterators[i] = partitions.get(i).iterator(selectChance.next(), true);
             partitionCount = partitions.size();
 
-            boolean done;
-            do
+            for (Partition.RowIterator iterator : iterators)
             {
-                done = true;
-                for (Partition.RowIterator iterator : iterators)
-                {
-                    if (iterator.done())
-                        continue;
+                if (iterator.done())
+                    continue;
 
-                    for (Row row : iterator.batch(perBatch.next()))
-                    {
-                        validate(client.execute_prepared_cql3_query(thriftId, iterator.partition().getToken(), thriftRowArgs(row), settings.command.consistencyLevel));
-                        rowCount += 1;
-                    }
-
-                    done &= iterator.done();
+                for (Row row : iterator.next())
+                {
+                    validate(client.execute_prepared_cql3_query(thriftId, iterator.partition().getToken(), thriftRowArgs(row), settings.command.consistencyLevel));
+                    rowCount += 1;
                 }
-            } while (!done);
+            }
+
+            for (Partition.RowIterator iterator : iterators)
+                iterator.markWriteFinished();
 
             return true;
         }
@@ -156,6 +149,11 @@ public class SchemaInsert extends SchemaStatement
         timeWithRetry(new JavaDriverRun(client));
     }
 
+    public boolean isWrite()
+    {
+        return true;
+    }
+
     @Override
     public void run(ThriftClient client) throws IOException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
index a047261..866f6ab 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
@@ -22,11 +22,18 @@ package org.apache.cassandra.stress.operations.userdefined;
 
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
 
+import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.ResultSet;
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.stress.generate.Partition;
 import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.Row;
 import org.apache.cassandra.stress.settings.OptionDistribution;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.settings.ValidationType;
@@ -39,19 +46,21 @@ import org.apache.cassandra.thrift.ThriftConversion;
 public class SchemaQuery extends SchemaStatement
 {
 
-    public SchemaQuery(Timer timer, PartitionGenerator generator, StressSettings settings, Integer thriftId, PreparedStatement statement, ConsistencyLevel cl, ValidationType validationType)
+    public static enum ArgSelect
     {
-        super(timer, generator, settings, OptionDistribution.get("fixed(1)").get(), statement, thriftId, cl, validationType);
+        MULTIROW, SAMEROW;
+        //TODO: FIRSTROW, LASTROW
     }
 
-    int execute(JavaDriverClient client) throws Exception
-    {
-        return client.getSession().execute(bindRandom(partitions.get(0))).all().size();
-    }
+    final ArgSelect argSelect;
+    final Object[][] randomBuffer;
+    final Random random = new Random();
 
-    int execute(ThriftClient client) throws Exception
+    public SchemaQuery(Timer timer, PartitionGenerator generator, StressSettings settings, Integer thriftId, PreparedStatement statement, ConsistencyLevel cl, ValidationType validationType, ArgSelect argSelect)
     {
-        return client.execute_prepared_cql3_query(thriftId, partitions.get(0).getToken(), thriftRandomArgs(partitions.get(0)), ThriftConversion.toThrift(cl)).getRowsSize();
+        super(timer, generator, settings, OptionDistribution.get("fixed(1)").get(), statement, thriftId, cl, validationType);
+        this.argSelect = argSelect;
+        randomBuffer = new Object[argumentIndex.length][argumentIndex.length];
     }
 
     private class JavaDriverRun extends Runner
@@ -65,7 +74,7 @@ public class SchemaQuery extends SchemaStatement
 
         public boolean run() throws Exception
         {
-            ResultSet rs = client.getSession().execute(bindRandom(partitions.get(0)));
+            ResultSet rs = client.getSession().execute(bindArgs(partitions.get(0)));
             validate(rs);
             rowCount = rs.all().size();
             partitionCount = Math.min(1, rowCount);
@@ -84,7 +93,7 @@ public class SchemaQuery extends SchemaStatement
 
         public boolean run() throws Exception
         {
-            CqlResult rs = client.execute_prepared_cql3_query(thriftId, partitions.get(0).getToken(), thriftRandomArgs(partitions.get(0)), ThriftConversion.toThrift(cl));
+            CqlResult rs = client.execute_prepared_cql3_query(thriftId, partitions.get(0).getToken(), thriftArgs(partitions.get(0)), ThriftConversion.toThrift(cl));
             validate(rs);
             rowCount = rs.getRowsSize();
             partitionCount = Math.min(1, rowCount);
@@ -92,6 +101,64 @@ public class SchemaQuery extends SchemaStatement
         }
     }
 
+    private int fillRandom(Partition partition)
+    {
+        int c = 0;
+        while (c == 0)
+        {
+            for (Row row : partition.iterator(randomBuffer.length, false).next())
+            {
+                Object[] randomRow = randomBuffer[c++];
+                for (int i = 0 ; i < argumentIndex.length ; i++)
+                    randomRow[i] = row.get(argumentIndex[i]);
+                if (c >= randomBuffer.length)
+                    break;
+            }
+        }
+        return c;
+    }
+
+    BoundStatement bindArgs(Partition partition)
+    {
+        switch (argSelect)
+        {
+            case MULTIROW:
+                int c = fillRandom(partition);
+                for (int i = 0 ; i < argumentIndex.length ; i++)
+                {
+                    int argIndex = argumentIndex[i];
+                    bindBuffer[i] = randomBuffer[argIndex < 0 ? 0 : random.nextInt(c)][i];
+                }
+                return statement.bind(bindBuffer);
+            case SAMEROW:
+                for (Row row : partition.iterator(1, false).next())
+                    return bindRow(row);
+            default:
+                throw new IllegalStateException();
+        }
+    }
+
+    List<ByteBuffer> thriftArgs(Partition partition)
+    {
+        switch (argSelect)
+        {
+            case MULTIROW:
+                List<ByteBuffer> args = new ArrayList<>();
+                int c = fillRandom(partition);
+                for (int i = 0 ; i < argumentIndex.length ; i++)
+                {
+                    int argIndex = argumentIndex[i];
+                    args.add(generator.convert(argIndex, randomBuffer[argIndex < 0 ? 0 : random.nextInt(c)][i]));
+                }
+                return args;
+            case SAMEROW:
+                for (Row row : partition.iterator(1, false).next())
+                    return thriftRowArgs(row);
+            default:
+                throw new IllegalStateException();
+        }
+    }
+
     @Override
     public void run(JavaDriverClient client) throws IOException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
index 2e0170c..1f7ed80 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
@@ -24,7 +24,6 @@ package org.apache.cassandra.stress.operations.userdefined;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 
@@ -40,8 +39,6 @@ import org.apache.cassandra.stress.generate.PartitionGenerator;
 import org.apache.cassandra.stress.generate.Row;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.settings.ValidationType;
-import org.apache.cassandra.stress.util.JavaDriverClient;
-import org.apache.cassandra.stress.util.ThriftClient;
 import org.apache.cassandra.stress.util.Timer;
 import org.apache.cassandra.thrift.CqlResult;
 import org.apache.cassandra.transport.SimpleClient;
@@ -50,14 +47,12 @@ public abstract class SchemaStatement extends Operation
 {
 
     final PartitionGenerator generator;
-    private final PreparedStatement statement;
+    final PreparedStatement statement;
     final Integer thriftId;
     final ConsistencyLevel cl;
     final ValidationType validationType;
-    private final int[] argumentIndex;
-    private final Object[] bindBuffer;
-    private final Object[][] randomBuffer;
-    private final Random random = new Random();
+    final int[] argumentIndex;
+    final Object[] bindBuffer;
 
     public SchemaStatement(Timer timer, PartitionGenerator generator, StressSettings settings, Distribution partitionCount,
                            PreparedStatement statement, Integer thriftId, ConsistencyLevel cl, ValidationType validationType)
@@ -70,41 +65,19 @@ public abstract class SchemaStatement extends Operation
         this.validationType = validationType;
         argumentIndex = new int[statement.getVariables().size()];
         bindBuffer = new Object[argumentIndex.length];
-        randomBuffer = new Object[argumentIndex.length][argumentIndex.length];
         int i = 0;
         for (ColumnDefinitions.Definition definition : statement.getVariables())
             argumentIndex[i++] = generator.indexOf(definition.getName());
     }
 
-    private int filLRandom(Partition partition)
-    {
-        int c = 0;
-        for (Row row : partition.iterator(randomBuffer.length).batch(1f))
-        {
-            Object[] randomRow = randomBuffer[c++];
-            for (int i = 0 ; i < argumentIndex.length ; i++)
-                randomRow[i] = row.get(argumentIndex[i]);
-            if (c >= randomBuffer.length)
-                break;
-        }
-        return c;
-    }
-
-    BoundStatement bindRandom(Partition partition)
-    {
-        int c = filLRandom(partition);
-        for (int i = 0 ; i < argumentIndex.length ; i++)
-        {
-            int argIndex = argumentIndex[i];
-            bindBuffer[i] = randomBuffer[argIndex < 0 ? 0 : random.nextInt(c)][i];
-        }
-        return statement.bind(bindBuffer);
-    }
-
     BoundStatement bindRow(Row row)
     {
         for (int i = 0 ; i < argumentIndex.length ; i++)
+        {
             bindBuffer[i] = row.get(argumentIndex[i]);
+            if (bindBuffer[i] == null && !generator.permitNulls(argumentIndex[i]))
+                throw new IllegalStateException();
+        }
         return statement.bind(bindBuffer);
     }
 
@@ -116,18 +89,6 @@ public abstract class SchemaStatement extends Operation
         return args;
     }
 
-    List<ByteBuffer> thriftRandomArgs(Partition partition)
-    {
-        List<ByteBuffer> args = new ArrayList<>();
-        int c = filLRandom(partition);
-        for (int i = 0 ; i < argumentIndex.length ; i++)
-        {
-            int argIndex = argumentIndex[i];
-            args.add(generator.convert(argIndex, randomBuffer[argIndex < 0 ? 0 : random.nextInt(c)][i]));
-        }
-        return args;
-    }
-
     void validate(ResultSet rs)
     {
         switch (validationType)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/settings/CliOption.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/CliOption.java b/tools/stress/src/org/apache/cassandra/stress/settings/CliOption.java
index 0e8ff1b..4d7c039 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/CliOption.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/CliOption.java
@@ -26,7 +26,8 @@ import java.util.Map;
 
 public enum CliOption
 {
-    KEY("Key details such as size in bytes and value distribution", SettingsKey.helpPrinter()),
+    POP("Population distribution and intra-partition visit order", SettingsPopulation.helpPrinter()),
+    INSERT("Insert specific options relating to various methods for batching and splitting partition updates", SettingsInsert.helpPrinter()),
     COL("Column details such as size and count distribution, data generator, names, comparator and if super columns should be used", SettingsColumn.helpPrinter()),
     RATE("Thread count, rate limit or automatic mode (default is auto)", SettingsRate.helpPrinter()),
     MODE("Thrift or CQL with options", SettingsMode.helpPrinter()),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/settings/OptionDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/OptionDistribution.java b/tools/stress/src/org/apache/cassandra/stress/settings/OptionDistribution.java
index 70a85ae..ef3dbb1 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/OptionDistribution.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/OptionDistribution.java
@@ -25,6 +25,8 @@ import java.util.*;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.google.common.base.Function;
+
 import org.apache.cassandra.stress.generate.*;
 import org.apache.commons.math3.distribution.ExponentialDistribution;
 import org.apache.commons.math3.distribution.NormalDistribution;
@@ -38,6 +40,14 @@ import org.apache.commons.math3.random.JDKRandomGenerator;
 public class OptionDistribution extends Option
 {
 
+    public static final Function<String, DistributionFactory> BUILDER = new Function<String, DistributionFactory>()
+    {
+        public DistributionFactory apply(String s)
+        {
+            return get(s);
+        }
+    };
+
     private static final Pattern FULL = Pattern.compile("(~?)([A-Z]+)\\((.+)\\)", Pattern.CASE_INSENSITIVE);
     private static final Pattern ARGS = Pattern.compile("[^,]+");
 
@@ -45,12 +55,19 @@ public class OptionDistribution extends Option
     private String spec;
     private final String defaultSpec;
     private final String description;
+    private final boolean required;
 
     public OptionDistribution(String prefix, String defaultSpec, String description)
     {
+        this(prefix, defaultSpec, description, defaultSpec == null);
+    }
+
+    public OptionDistribution(String prefix, String defaultSpec, String description, boolean required)
+    {
         this.prefix = prefix;
         this.defaultSpec = defaultSpec;
         this.description = description;
+        this.required = required;
     }
 
     @Override
@@ -82,13 +99,13 @@ public class OptionDistribution extends Option
 
     public DistributionFactory get()
     {
-        return spec != null ? get(spec) : get(defaultSpec);
+        return spec != null ? get(spec) : defaultSpec != null ? get(defaultSpec) : null;
     }
 
     @Override
     public boolean happy()
     {
-        return spec != null || defaultSpec != null;
+        return !required || spec != null;
     }
 
     public String longDisplay()
@@ -102,12 +119,13 @@ public class OptionDistribution extends Option
         return Arrays.asList(
                 GroupedOptions.formatMultiLine("EXP(min..max)", "An exponential distribution over the range [min..max]"),
                 GroupedOptions.formatMultiLine("EXTREME(min..max,shape)", "An extreme value (Weibull) distribution over the range [min..max]"),
+                GroupedOptions.formatMultiLine("QEXTREME(min..max,shape,quantas)", "An extreme value, split into quantas, within which the chance of selection is uniform"),
                 GroupedOptions.formatMultiLine("GAUSSIAN(min..max,stdvrng)", "A gaussian/normal distribution, where mean=(min+max)/2, and stdev is (mean-min)/stdvrng"),
                 GroupedOptions.formatMultiLine("GAUSSIAN(min..max,mean,stdev)", "A gaussian/normal distribution, with explicitly defined mean and stdev"),
                 GroupedOptions.formatMultiLine("UNIFORM(min..max)", "A uniform distribution over the range [min, max]"),
                 GroupedOptions.formatMultiLine("FIXED(val)", "A fixed distribution, always returning the same value"),
                 "Preceding the name with ~ will invert the distribution, e.g. ~exp(1..10) will yield 10 most, instead of least, often",
-                "Aliases: extr, gauss, normal, norm, weibull"
+                "Aliases: extr, qextr, gauss, normal, norm, weibull"
         );
     }
 
@@ -128,7 +146,9 @@ public class OptionDistribution extends Option
         final Map<String, Impl> lookup = new HashMap<>();
         lookup.put("exp", new ExponentialImpl());
         lookup.put("extr", new ExtremeImpl());
-        lookup.put("extreme", lookup.get("extreme"));
+        lookup.put("qextr", new QuantizedExtremeImpl());
+        lookup.put("extreme", lookup.get("extr"));
+        lookup.put("qextreme", lookup.get("qextr"));
         lookup.put("weibull", lookup.get("weibull"));
         lookup.put("gaussian", new GaussianImpl());
         lookup.put("normal", lookup.get("gaussian"));
@@ -245,6 +265,32 @@ public class OptionDistribution extends Option
         }
     }
 
+    private static final class QuantizedExtremeImpl implements Impl
+    {
+        @Override
+        public DistributionFactory getFactory(List<String> params)
+        {
+            if (params.size() != 3)
+                throw new IllegalArgumentException("Invalid parameter list for quantized extreme (Weibull) distribution: " + params);
+            try
+            {
+                String[] bounds = params.get(0).split("\\.\\.+");
+                final long min = parseLong(bounds[0]);
+                final long max = parseLong(bounds[1]);
+                final double shape = Double.parseDouble(params.get(1));
+                final int quantas = Integer.parseInt(params.get(2));
+                WeibullDistribution findBounds = new WeibullDistribution(shape, 1d);
+                // max probability should be roughly equal to accuracy of (max-min) to ensure all values are visitable,
+                // over entire range, but this results in overly skewed distribution, so take sqrt
+                final double scale = (max - min) / findBounds.inverseCumulativeProbability(1d - Math.sqrt(1d/(max-min)));
+                return new QuantizedExtremeFactory(min, max, shape, scale, quantas);
+            } catch (Exception _)
+            {
+                throw new IllegalArgumentException("Invalid parameter list for quantized extreme (Weibull) distribution: " + params);
+            }
+        }
+    }
+
     private static final class UniformImpl implements Impl
     {
 
@@ -319,7 +365,7 @@ public class OptionDistribution extends Option
         }
     }
 
-    private static final class ExtremeFactory implements DistributionFactory
+    private static class ExtremeFactory implements DistributionFactory
     {
         final long min, max;
         final double shape, scale;
@@ -338,6 +384,22 @@ public class OptionDistribution extends Option
         }
     }
 
+    private static final class QuantizedExtremeFactory extends ExtremeFactory
+    {
+        final int quantas;
+        private QuantizedExtremeFactory(long min, long max, double shape, double scale, int quantas)
+        {
+            super(min, max, shape, scale);
+            this.quantas = quantas;
+        }
+
+        @Override
+        public Distribution get()
+        {
+            return new DistributionQuantized(new DistributionOffsetApache(new WeibullDistribution(new JDKRandomGenerator(), shape, scale, WeibullDistribution.DEFAULT_INVERSE_ABSOLUTE_ACCURACY), min, max), quantas);
+        }
+    }
+
     private static final class GaussianFactory implements DistributionFactory
     {
         final long min, max;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/settings/OptionRatioDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/OptionRatioDistribution.java b/tools/stress/src/org/apache/cassandra/stress/settings/OptionRatioDistribution.java
index 2459c20..aacb616 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/OptionRatioDistribution.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/OptionRatioDistribution.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.google.common.base.Function;
 import org.apache.commons.math3.distribution.ExponentialDistribution;
 import org.apache.commons.math3.distribution.NormalDistribution;
 import org.apache.commons.math3.distribution.UniformRealDistribution;
@@ -50,16 +51,29 @@ import org.apache.cassandra.stress.generate.RatioDistributionFactory;
 public class OptionRatioDistribution extends Option
 {
 
+    public static final Function<String, RatioDistributionFactory> BUILDER = new Function<String, RatioDistributionFactory>()
+    {
+        public RatioDistributionFactory apply(String s)
+        {
+            return get(s);
+        }
+    };
+
     private static final Pattern FULL = Pattern.compile("(.*)/([0-9]+[KMB]?)", Pattern.CASE_INSENSITIVE);
 
     final OptionDistribution delegate;
     private double divisor;
-
-    private static final RatioDistribution DEFAULT = new RatioDistribution(new DistributionFixed(1), 1);
+    final String defaultSpec;
 
     public OptionRatioDistribution(String prefix, String defaultSpec, String description)
     {
-        delegate = new OptionDistribution(prefix, defaultSpec, description);
+        this(prefix, defaultSpec, description, defaultSpec != null);
+    }
+
+    public OptionRatioDistribution(String prefix, String defaultSpec, String description, boolean required)
+    {
+        delegate = new OptionDistribution(prefix, null, description, required);
+        this.defaultSpec = defaultSpec;
     }
 
     @Override
@@ -74,7 +88,7 @@ public class OptionRatioDistribution extends Option
 
     public static RatioDistributionFactory get(String spec)
     {
-        OptionRatioDistribution opt = new OptionRatioDistribution("", "", "");
+        OptionRatioDistribution opt = new OptionRatioDistribution("", "", "", true);
         if (!opt.accept(spec))
             throw new IllegalArgumentException();
         return opt.get();
@@ -82,7 +96,14 @@ public class OptionRatioDistribution extends Option
 
     public RatioDistributionFactory get()
     {
-        return !delegate.setByUser() ? new DefaultFactory() : new DelegateFactory(delegate.get(), divisor);
+        if (delegate.setByUser())
+            return new DelegateFactory(delegate.get(), divisor);
+        if (defaultSpec == null)
+            return null;
+        OptionRatioDistribution sub = new OptionRatioDistribution(delegate.prefix, null, null, true);
+        if (!sub.accept(defaultSpec))
+            throw new IllegalStateException("Invalid default spec: " + defaultSpec);
+        return sub.get();
     }
 
     @Override
@@ -124,15 +145,6 @@ public class OptionRatioDistribution extends Option
 
     // factories
 
-    private static final class DefaultFactory implements RatioDistributionFactory
-    {
-        @Override
-        public RatioDistribution get()
-        {
-            return DEFAULT;
-        }
-    }
-
     private static final class DelegateFactory implements RatioDistributionFactory
     {
         final DistributionFactory delegate;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommand.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommand.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommand.java
index 032f00c..59accb9 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommand.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommand.java
@@ -26,6 +26,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.cassandra.stress.generate.SeedManager;
 import org.apache.cassandra.stress.operations.OpDistributionFactory;
 import org.apache.cassandra.thrift.ConsistencyLevel;
 
@@ -35,8 +36,6 @@ public abstract class SettingsCommand implements Serializable
 
     public final Command type;
     public final long count;
-    public final int tries;
-    public final boolean ignoreErrors;
     public final boolean noWarmup;
     public final ConsistencyLevel consistencyLevel;
     public final double targetUncertainty;
@@ -56,8 +55,6 @@ public abstract class SettingsCommand implements Serializable
     public SettingsCommand(Command type, Options options, Count count, Uncertainty uncertainty)
     {
         this.type = type;
-        this.tries = Math.max(1, Integer.parseInt(options.retries.value()) + 1);
-        this.ignoreErrors = options.ignoreErrors.setByUser();
         this.consistencyLevel = ConsistencyLevel.valueOf(options.consistencyLevel.value().toUpperCase());
         this.noWarmup = options.noWarmup.setByUser();
         if (count != null)
@@ -80,11 +77,8 @@ public abstract class SettingsCommand implements Serializable
 
     static abstract class Options extends GroupedOptions
     {
-        final OptionSimple retries = new OptionSimple("tries=", "[0-9]+", "9", "Number of tries to perform for each operation before failing", false);
-        final OptionSimple ignoreErrors = new OptionSimple("ignore_errors", "", null, "Do not print/log errors", false);
-        final OptionSimple noWarmup = new OptionSimple("no_warmup", "", null, "Do not warmup the process", false);
+        final OptionSimple noWarmup = new OptionSimple("no-warmup", "", null, "Do not warmup the process", false);
         final OptionSimple consistencyLevel = new OptionSimple("cl=", "ONE|QUORUM|LOCAL_QUORUM|EACH_QUORUM|ALL|ANY", "ONE", "Consistency level to use", false);
-        final OptionSimple atOnce = new OptionSimple("at-once=", "[0-9]+", "1000", "Number of keys per operation for multiget", false);
     }
 
     static class Count extends Options
@@ -93,7 +87,7 @@ public abstract class SettingsCommand implements Serializable
         @Override
         public List<? extends Option> options()
         {
-            return Arrays.asList(count, retries, noWarmup, ignoreErrors, consistencyLevel, atOnce);
+            return Arrays.asList(count, noWarmup, consistencyLevel);
         }
     }
 
@@ -105,7 +99,7 @@ public abstract class SettingsCommand implements Serializable
         @Override
         public List<? extends Option> options()
         {
-            return Arrays.asList(uncertainty, minMeasurements, maxMeasurements, retries, noWarmup, ignoreErrors, consistencyLevel, atOnce);
+            return Arrays.asList(uncertainty, minMeasurements, maxMeasurements, noWarmup, consistencyLevel);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java
index ac113d1..5a8b604 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java
@@ -27,6 +27,7 @@ import java.util.List;
 
 import org.apache.cassandra.stress.generate.DistributionFactory;
 import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.SeedManager;
 import org.apache.cassandra.stress.generate.values.Bytes;
 import org.apache.cassandra.stress.generate.values.Generator;
 import org.apache.cassandra.stress.generate.values.GeneratorConfig;
@@ -42,14 +43,16 @@ public class SettingsCommandPreDefined extends SettingsCommand
 {
 
     public final DistributionFactory add;
+    public final int keySize;
 
     public OpDistributionFactory getFactory(final StressSettings settings)
     {
+        final SeedManager seeds = new SeedManager(settings);
         return new OpDistributionFactory()
         {
             public OpDistribution get(Timer timer)
             {
-                return new FixedOpDistribution(PredefinedOperation.operation(type, timer, newGenerator(settings), settings, add));
+                return new FixedOpDistribution(PredefinedOperation.operation(type, timer, newGenerator(settings, seeds), settings, add));
             }
 
             public String desc()
@@ -64,23 +67,24 @@ public class SettingsCommandPreDefined extends SettingsCommand
         };
     }
 
-    PartitionGenerator newGenerator(StressSettings settings)
+    PartitionGenerator newGenerator(StressSettings settings, SeedManager seeds)
     {
         List<String> names = settings.columns.namestrs;
         List<Generator> partitionKey = Collections.<Generator>singletonList(new HexBytes("key",
                                        new GeneratorConfig("randomstrkey", null,
-                                                           OptionDistribution.get("fixed(" + settings.keys.keySize + ")"), null)));
+                                                           OptionDistribution.get("fixed(" + keySize + ")"), null)));
 
         List<Generator> columns = new ArrayList<>();
         for (int i = 0 ; i < settings.columns.maxColumnsPerKey ; i++)
             columns.add(new Bytes(names.get(i), new GeneratorConfig("randomstr" + names.get(i), null, settings.columns.sizeDistribution, null)));
-        return new PartitionGenerator(partitionKey, Collections.<Generator>emptyList(), columns);
+        return new PartitionGenerator(partitionKey, Collections.<Generator>emptyList(), columns, PartitionGenerator.Order.ARBITRARY, seeds);
     }
 
     public SettingsCommandPreDefined(Command type, Options options)
     {
         super(type, options.parent);
         add = options.add.get();
+        keySize = Integer.parseInt(options.keysize.value());
     }
 
     // Option Declarations
@@ -93,6 +97,7 @@ public class SettingsCommandPreDefined extends SettingsCommand
             this.parent = parent;
         }
         final OptionDistribution add = new OptionDistribution("add=", "fixed(1)", "Distribution of value of counter increments");
+        final OptionSimple keysize = new OptionSimple("keysize=", "[0-9]+", "10", "Key size in bytes", false);
 
         @Override
         public List<? extends Option> options()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java
index e5d4f80..5c9c70c 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java
@@ -27,6 +27,7 @@ import java.util.List;
 import org.apache.cassandra.stress.Operation;
 import org.apache.cassandra.stress.generate.DistributionFactory;
 import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.SeedManager;
 import org.apache.cassandra.stress.operations.OpDistributionFactory;
 import org.apache.cassandra.stress.operations.SampledOpDistributionFactory;
 import org.apache.cassandra.stress.operations.predefined.PredefinedOperation;
@@ -54,6 +55,7 @@ public class SettingsCommandPreDefinedMixed extends SettingsCommandPreDefined
 
     public OpDistributionFactory getFactory(final StressSettings settings)
     {
+        final SeedManager seeds = new SeedManager(settings);
         return new SampledOpDistributionFactory<Command>(ratios, clustering)
         {
             protected Operation get(Timer timer, PartitionGenerator generator, Command key)
@@ -63,7 +65,7 @@ public class SettingsCommandPreDefinedMixed extends SettingsCommandPreDefined
 
             protected PartitionGenerator newGenerator()
             {
-                return SettingsCommandPreDefinedMixed.this.newGenerator(settings);
+                return SettingsCommandPreDefinedMixed.this.newGenerator(settings, seeds);
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
index f36296e..88c6e1e 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
@@ -27,10 +27,14 @@ import java.util.List;
 
 import org.apache.commons.math3.util.Pair;
 
+import com.google.common.collect.ImmutableList;
+
+import com.datastax.driver.core.BatchStatement;
 import org.apache.cassandra.stress.Operation;
 import org.apache.cassandra.stress.StressProfile;
 import org.apache.cassandra.stress.generate.DistributionFactory;
 import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.SeedManager;
 import org.apache.cassandra.stress.operations.OpDistributionFactory;
 import org.apache.cassandra.stress.operations.SampledOpDistributionFactory;
 import org.apache.cassandra.stress.util.Timer;
@@ -58,6 +62,7 @@ public class SettingsCommandUser extends SettingsCommand
 
     public OpDistributionFactory getFactory(final StressSettings settings)
     {
+        final SeedManager seeds = new SeedManager(settings);
         return new SampledOpDistributionFactory<String>(ratios, clustering)
         {
             protected Operation get(Timer timer, PartitionGenerator generator, String key)
@@ -69,7 +74,7 @@ public class SettingsCommandUser extends SettingsCommand
 
             protected PartitionGenerator newGenerator()
             {
-                return profile.newGenerator(settings);
+                return profile.newGenerator(settings, seeds);
             }
         };
     }
@@ -81,19 +86,14 @@ public class SettingsCommandUser extends SettingsCommand
         {
             this.parent = parent;
         }
-        final OptionDistribution clustering = new OptionDistribution("clustering=", "GAUSSIAN(1..10)", "Distribution clustering runs of operations of the same kind");
+        final OptionDistribution clustering = new OptionDistribution("clustering=", "gaussian(1..10)", "Distribution clustering runs of operations of the same kind");
         final OptionSimple profile = new OptionSimple("profile=", ".*", null, "Specify the path to a yaml cql3 profile", false);
         final OptionAnyProbabilities ops = new OptionAnyProbabilities("ops", "Specify the ratios for inserts/queries to perform; e.g. ops(insert=2,<query1>=1) will perform 2 inserts for each query1");
 
         @Override
         public List<? extends Option> options()
         {
-            final List<Option> options = new ArrayList<>();
-            options.add(clustering);
-            options.add(ops);
-            options.add(profile);
-            options.addAll(parent.options());
-            return options;
+            return ImmutableList.<Option>builder().add(ops, clustering, profile).addAll(parent.options()).build();
         }
 
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsErrors.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsErrors.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsErrors.java
new file mode 100644
index 0000000..625f803
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsErrors.java
@@ -0,0 +1,92 @@
+package org.apache.cassandra.stress.settings;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.PrintStream;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class SettingsErrors implements Serializable
+{
+
+    public final boolean ignore;
+    public final int tries;
+
+    public SettingsErrors(Options options)
+    {
+        ignore = options.ignore.setByUser();
+        this.tries = Math.max(1, Integer.parseInt(options.retries.value()) + 1);
+    }
+
+    // Option Declarations
+
+    public static final class Options extends GroupedOptions
+    {
+        final OptionSimple retries = new OptionSimple("retries=", "[0-9]+", "9", "Number of tries to perform for each operation before failing", false);
+        final OptionSimple ignore = new OptionSimple("ignore", "", null, "Do not fail on errors", false);
+
+        @Override
+        public List<? extends Option> options()
+        {
+            return Arrays.asList(retries, ignore);
+        }
+    }
+
+    // CLI Utility Methods
+
+    public static SettingsErrors get(Map<String, String[]> clArgs)
+    {
+        String[] params = clArgs.remove("-errors");
+        if (params == null)
+            return new SettingsErrors(new Options());
+
+        GroupedOptions options = GroupedOptions.select(params, new Options());
+        if (options == null)
+        {
+            printHelp();
+            System.out.println("Invalid -errors options provided, see output for valid options");
+            System.exit(1);
+        }
+        return new SettingsErrors((Options) options);
+    }
+
+    public static void printHelp()
+    {
+        GroupedOptions.printOptions(System.out, "-errors", new Options());
+    }
+
+    public static Runnable helpPrinter()
+    {
+        return new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                printHelp();
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsInsert.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsInsert.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsInsert.java
new file mode 100644
index 0000000..a6c298b
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsInsert.java
@@ -0,0 +1,103 @@
+package org.apache.cassandra.stress.settings;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import com.datastax.driver.core.BatchStatement;
+import org.apache.cassandra.stress.generate.DistributionFactory;
+import org.apache.cassandra.stress.generate.RatioDistributionFactory;
+
+public class SettingsInsert implements Serializable
+{
+
+    public final DistributionFactory revisit;
+    public final DistributionFactory visits;
+    public final DistributionFactory batchsize;
+    public final RatioDistributionFactory selectRatio;
+    public final BatchStatement.Type batchType;
+
+    private SettingsInsert(InsertOptions options)
+    {
+        this.visits= options.visits.get();
+        this.revisit = options.revisit.get();
+        this.batchsize = options.partitions.get();
+        this.selectRatio = options.selectRatio.get();
+        this.batchType = !options.batchType.setByUser() ? null : BatchStatement.Type.valueOf(options.batchType.value());
+    }
+
+    // Option Declarations
+
+    private static class InsertOptions extends GroupedOptions
+    {
+        final OptionDistribution visits = new OptionDistribution("visits=", "fixed(1)", "The target number of inserts to split a partition into; if more than one, the partition will be placed in the revisit set");
+        final OptionDistribution revisit = new OptionDistribution("revisit=", "uniform(1..1M)", "The distribution with which we revisit partial writes (see visits); implicitly defines size of revisit collection");
+        final OptionDistribution partitions = new OptionDistribution("partitions=", null, "The number of partitions to update in a single batch", false);
+        final OptionSimple batchType = new OptionSimple("batchtype=", "unlogged|logged|counter", null, "Specify the type of batch statement (LOGGED, UNLOGGED or COUNTER)", false);
+        final OptionRatioDistribution selectRatio = new OptionRatioDistribution("select-ratio=", null, "The uniform probability of visiting any CQL row in the generated partition", false);
+
+        @Override
+        public List<? extends Option> options()
+        {
+            return Arrays.asList(revisit, visits, partitions, batchType, selectRatio);
+        }
+    }
+
+    // CLI Utility Methods
+
+    public static SettingsInsert get(Map<String, String[]> clArgs)
+    {
+        String[] params = clArgs.remove("-insert");
+        if (params == null)
+            return new SettingsInsert(new InsertOptions());
+
+        InsertOptions options = GroupedOptions.select(params, new InsertOptions());
+        if (options == null)
+        {
+            printHelp();
+            System.out.println("Invalid -insert options provided, see output for valid options");
+            System.exit(1);
+        }
+        return new SettingsInsert(options);
+    }
+
+    public static void printHelp()
+    {
+        GroupedOptions.printOptions(System.out, "-insert", new InsertOptions());
+    }
+
+    public static Runnable helpPrinter()
+    {
+        return new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                printHelp();
+            }
+        };
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsKey.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsKey.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsKey.java
deleted file mode 100644
index 017b106..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsKey.java
+++ /dev/null
@@ -1,153 +0,0 @@
-package org.apache.cassandra.stress.settings;
-/*
- * 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * 
- */
-
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.cassandra.stress.generate.DistributionFactory;
-import org.apache.cassandra.stress.generate.SeedGenerator;
-import org.apache.cassandra.stress.generate.SeedRandomGenerator;
-import org.apache.cassandra.stress.generate.SeedSeriesGenerator;
-
-// Settings for key generation
-public class SettingsKey implements Serializable
-{
-
-    final int keySize;
-    private final DistributionFactory distribution;
-    private final DistributionFactory clustering;
-    private final long[] range;
-
-    public SettingsKey(DistributionOptions options)
-    {
-        this.keySize = Integer.parseInt(options.size.value());
-        this.distribution = options.dist.get();
-        this.clustering = options.clustering.get();
-        this.range = null;
-    }
-
-    public SettingsKey(PopulateOptions options)
-    {
-        this.keySize = Integer.parseInt(options.size.value());
-        this.distribution = null;
-        this.clustering = null;
-        String[] bounds = options.populate.value().split("\\.\\.+");
-        this.range = new long[] { OptionDistribution.parseLong(bounds[0]), OptionDistribution.parseLong(bounds[1]) };
-    }
-
-    // Option Declarations
-
-    private static final class DistributionOptions extends GroupedOptions
-    {
-        final OptionDistribution dist;
-        final OptionDistribution clustering = new OptionDistribution("cluster=", "fixed(1)", "Keys are clustered in adjacent value runs of this size");
-        final OptionSimple size = new OptionSimple("size=", "[0-9]+", "10", "Key size in bytes", false);
-
-        public DistributionOptions(String defaultLimit)
-        {
-            dist = new OptionDistribution("dist=", "GAUSSIAN(1.." + defaultLimit + ")", "Keys are selected from this distribution");
-        }
-
-        @Override
-        public List<? extends Option> options()
-        {
-            return Arrays.asList(dist, size, clustering);
-        }
-    }
-
-    private static final class PopulateOptions extends GroupedOptions
-    {
-        final OptionSimple populate;
-        final OptionSimple size = new OptionSimple("size=", "[0-9]+", "10", "Key size in bytes", false);
-
-        public PopulateOptions(String defaultLimit)
-        {
-            populate = new OptionSimple("populate=", "[0-9]+\\.\\.+[0-9]+[MBK]?",
-                    "1.." + defaultLimit,
-                    "Populate all keys in sequence", true);
-        }
-
-        @Override
-        public List<? extends Option> options()
-        {
-            return Arrays.asList(populate, size);
-        }
-    }
-
-    public SeedGenerator newSeedGenerator()
-    {
-        return range == null ? new SeedRandomGenerator(distribution.get(), clustering.get()) : new SeedSeriesGenerator(range[0], range[1]);
-    }
-
-    // CLI Utility Methods
-
-    public static SettingsKey get(Map<String, String[]> clArgs, SettingsCommand command)
-    {
-        // set default size to number of commands requested, unless set to err convergence, then use 1M
-        String defaultLimit = command.count <= 0 ? "1000000" : Long.toString(command.count);
-
-        String[] params = clArgs.remove("-key");
-        if (params == null)
-        {
-            // return defaults:
-            switch(command.type)
-            {
-                case WRITE:
-                case COUNTER_WRITE:
-                    return new SettingsKey(new PopulateOptions(defaultLimit));
-                default:
-                    return new SettingsKey(new DistributionOptions(defaultLimit));
-            }
-        }
-        GroupedOptions options = GroupedOptions.select(params, new PopulateOptions(defaultLimit), new DistributionOptions(defaultLimit));
-        if (options == null)
-        {
-            printHelp();
-            System.out.println("Invalid -key options provided, see output for valid options");
-            System.exit(1);
-        }
-        return options instanceof PopulateOptions ?
-                new SettingsKey((PopulateOptions) options) :
-                new SettingsKey((DistributionOptions) options);
-    }
-
-    public static void printHelp()
-    {
-        GroupedOptions.printOptions(System.out, "-key", new PopulateOptions("N"), new DistributionOptions("N"));
-    }
-
-    public static Runnable helpPrinter()
-    {
-        return new Runnable()
-        {
-            @Override
-            public void run()
-            {
-                printHelp();
-            }
-        };
-    }
-}
-