You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/12/12 16:08:08 UTC
[1/3] cassandra git commit: cassandra-stress simultaneous inserts
over same seed (take two)
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 51f7cad48 -> 597a1d5db
refs/heads/trunk b5795ef96 -> 5876d9342
cassandra-stress simultaneous inserts over same seed
(take two)
patch by benedict; reviewed by rstupp CASSANDRA-7964
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/597a1d5d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/597a1d5d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/597a1d5d
Branch: refs/heads/cassandra-2.1
Commit: 597a1d5db27ef9e37f7066868e76cc9450fc3c9c
Parents: 51f7cad
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Dec 12 15:07:15 2014 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Dec 12 15:07:15 2014 +0000
----------------------------------------------------------------------
.../apache/cassandra/stress/WorkManager.java | 65 ++
.../stress/generate/PartitionIterator.java | 632 +++++++++++++++++++
.../SampledOpDistributionFactory.java | 1 +
3 files changed, 698 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/597a1d5d/tools/stress/src/org/apache/cassandra/stress/WorkManager.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/WorkManager.java b/tools/stress/src/org/apache/cassandra/stress/WorkManager.java
new file mode 100644
index 0000000..c6a3eee
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/WorkManager.java
@@ -0,0 +1,65 @@
+package org.apache.cassandra.stress;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+interface WorkManager
+{
+ // -1 indicates consumer should terminate
+ int takePermits(int count);
+
+ // signal all consumers to terminate
+ void stop();
+
+ static final class FixedWorkManager implements WorkManager
+ {
+
+ final AtomicLong permits;
+
+ public FixedWorkManager(long permits)
+ {
+ this.permits = new AtomicLong(permits);
+ }
+
+ @Override
+ public int takePermits(int count)
+ {
+ while (true)
+ {
+ long cur = permits.get();
+ if (cur == 0)
+ return -1;
+ count = (int) Math.min(count, cur);
+ long next = cur - count;
+ if (permits.compareAndSet(cur, next))
+ return count;
+ }
+ }
+
+ @Override
+ public void stop()
+ {
+ permits.getAndSet(0);
+ }
+ }
+
+ static final class ContinuousWorkManager implements WorkManager
+ {
+
+ volatile boolean stop = false;
+
+ @Override
+ public int takePermits(int count)
+ {
+ if (stop)
+ return -1;
+ return count;
+ }
+
+ @Override
+ public void stop()
+ {
+ stop = true;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/597a1d5d/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
new file mode 100644
index 0000000..baab867
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
@@ -0,0 +1,632 @@
+package org.apache.cassandra.stress.generate;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.generate.values.Generator;
+
+// a partition is re-used to reduce garbage generation, as is its internal RowIterator
+// TODO: we should batch the generation of clustering components so we can bound the time and size necessary to
+// generate huge partitions with only a small number of clustering components; i.e. we should generate seeds for batches
+// of a single component, and then generate the values within those batches as necessary. this will be difficult with
+// generating sorted partitions, and may require generator support (e.g. we may need to support generating prefixes
+// that are extended/suffixed to generate each batch, so that we can sort the prefixes)
+public abstract class PartitionIterator implements Iterator<Row>
+{
+
+ // we reuse the row object to save garbage
+ abstract boolean reset(double useChance, int targetCount, Operation op);
+
+ long idseed;
+ Seed seed;
+ final Object[] partitionKey;
+ final PartitionGenerator generator;
+ final SeedManager seedManager;
+ final Row row;
+
+ public static PartitionIterator get(PartitionGenerator generator, SeedManager seedManager)
+ {
+ if (generator.clusteringComponents.size() > 0)
+ return new MultiRowIterator(generator, seedManager);
+ else
+ return new SingleRowIterator(generator, seedManager);
+ }
+
+ private PartitionIterator(PartitionGenerator generator, SeedManager seedManager)
+ {
+ this.generator = generator;
+ this.seedManager = seedManager;
+ this.partitionKey = new Object[generator.partitionKey.size()];
+ this.row = new Row(partitionKey, new Object[generator.clusteringComponents.size() + generator.valueComponents.size()]);
+ }
+
+ private void setSeed(Seed seed)
+ {
+ long idseed = 0;
+ for (int i = 0 ; i < partitionKey.length ; i++)
+ {
+ Generator generator = this.generator.partitionKey.get(i);
+ // set the partition key seed based on the current work item we're processing
+ generator.setSeed(seed.seed);
+ Object key = generator.generate();
+ partitionKey[i] = key;
+ // then contribute this value to the data seed
+ idseed = seed(key, generator.type, idseed);
+ }
+ this.seed = seed;
+ this.idseed = idseed;
+ }
+
+ public boolean reset(Seed seed, double useChance, Operation op)
+ {
+ setSeed(seed);
+ return reset(useChance, 0, op);
+ }
+
+ public boolean reset(Seed seed, int targetCount, Operation op)
+ {
+ setSeed(seed);
+ return reset(Double.NaN, targetCount, op);
+ }
+
+ static class SingleRowIterator extends PartitionIterator
+ {
+ boolean done;
+ boolean isWrite;
+
+ private SingleRowIterator(PartitionGenerator generator, SeedManager seedManager)
+ {
+ super(generator, seedManager);
+ }
+
+ boolean reset(double useChance, int targetCount, Operation op)
+ {
+ done = false;
+ isWrite = op.isWrite();
+ return true;
+ }
+
+ public boolean hasNext()
+ {
+ return !done;
+ }
+
+ public Row next()
+ {
+ if (done)
+ throw new NoSuchElementException();
+ for (int i = 0 ; i < row.row.length ; i++)
+ {
+ Generator gen = generator.valueComponents.get(i);
+ gen.setSeed(idseed);
+ row.row[i] = gen.generate();
+ }
+ done = true;
+ if (isWrite)
+ {
+ seedManager.markFirstWrite(seed, true);
+ seedManager.markLastWrite(seed, true);
+ }
+ return row;
+ }
+ }
+
+ // permits iterating a random subset of the procedurally generated rows in this partition. this is the only mechanism for visiting rows.
+ // we maintain a stack of clustering components and their seeds; for each clustering component we visit, we generate all values it takes at that level,
+ // and then, using the average (total) number of children it takes we randomly choose whether or not we visit its children;
+ // if we do, we generate all possible values the immediate children can take, and repeat the process. So at any one time we are using space proportional
+ // to C.N, where N is the average number of values each clustering component takes, as opposed to N^C total values in the partition.
+ // TODO : support first/last row, and constraining reads to rows we know are populated
+ static class MultiRowIterator extends PartitionIterator
+ {
+
+ // probability any single row will be generated in this iteration
+ double useChance;
+
+ // the seed used to generate the current values for the clustering components at each depth;
+ // used to save recalculating it for each row, so we only need to recalc from prior row.
+ final long[] clusteringSeeds = new long[generator.clusteringComponents.size()];
+ // the components remaining to be visited for each level of the current stack
+ final Deque<Object>[] clusteringComponents = new ArrayDeque[generator.clusteringComponents.size()];
+
+ // we want our chance of selection to be applied uniformly, so we compound the roll we make at each level
+ // so that we know with what chance we reached there, and we adjust our roll at that level by that amount
+ final double[] chancemodifier = new double[generator.clusteringComponents.size()];
+ final double[] rollmodifier = new double[generator.clusteringComponents.size()];
+ final ThreadLocalRandom random = ThreadLocalRandom.current();
+
+ // track where in the partition we are, and where we are limited to
+ final int[] position = new int[generator.clusteringComponents.size()];
+ final int[] limit = new int[position.length];
+ boolean hasNext, isFirstWrite, isWrite;
+
+ // reusable collections for generating unique and sorted clustering components
+ final Set<Object> unique = new HashSet<>();
+ final List<Object> tosort = new ArrayList<>();
+
+ MultiRowIterator(PartitionGenerator generator, SeedManager seedManager)
+ {
+ super(generator, seedManager);
+ for (int i = 0 ; i < clusteringComponents.length ; i++)
+ clusteringComponents[i] = new ArrayDeque<>();
+ rollmodifier[0] = 1f;
+ chancemodifier[0] = generator.clusteringDescendantAverages[0];
+ }
+
+ // if we're a write, the expected behaviour is that the requested batch count is compounded with the seed's visit
+ // count to decide how much we should return in one iteration
+ boolean reset(double useChance, int targetCount, Operation op)
+ {
+ if (this.useChance < 1d)
+ {
+ // we clear our prior roll-modifiers if the use chance was previously less-than zero
+ Arrays.fill(rollmodifier, 1d);
+ Arrays.fill(chancemodifier, 1d);
+ }
+
+ // set the seed for the first clustering component
+ generator.clusteringComponents.get(0).setSeed(idseed);
+
+ // calculate how many first clustering components we'll generate, and how many total rows this predicts
+ int firstComponentCount = (int) generator.clusteringComponents.get(0).clusteringDistribution.next();
+ int expectedRowCount;
+
+ int position = seed.position();
+ isWrite = op.isWrite();
+
+ if (isWrite)
+ expectedRowCount = firstComponentCount * generator.clusteringDescendantAverages[0];
+ else if (position != 0)
+ expectedRowCount = setLimit(position);
+ else
+ expectedRowCount = setNoLimit(firstComponentCount);
+
+ if (Double.isNaN(useChance))
+ useChance = Math.max(0d, Math.min(1d, targetCount / (double) expectedRowCount));
+ this.useChance = useChance;
+
+ while (true)
+ {
+ // TODO: we could avoid repopulating these each loop, by tracking our prior position
+ for (Queue<?> q : clusteringComponents)
+ q.clear();
+ clusteringSeeds[0] = idseed;
+ fill(clusteringComponents[0], firstComponentCount, generator.clusteringComponents.get(0));
+
+ // we loop in case we have picked an entirely non-existent range, in which case
+ // we will reset the seed's position, then try again (until we exhaust it or find
+ // some real range) - this only happens for writes, so we only keep this logic in the loop
+
+ if (isWrite)
+ {
+ position = seed.moveForwards(Math.max(1, expectedRowCount / seed.visits));
+ isFirstWrite = position == 0;
+ }
+
+ // seek to our start position
+ switch (seek(isWrite ? position : null))
+ {
+ case END_OF_PARTITION:
+ return false;
+ case SUCCESS:
+ return true;
+ }
+
+ if (!isWrite)
+ throw new IllegalStateException();
+
+ // TODO: recompose our real position into the nearest scalar position, and ensure the seed position is >= this
+ }
+ }
+
+ private void decompose(int scalar, int[] decomposed)
+ {
+ for (int i = 0 ; i < decomposed.length ; i++)
+ {
+ int avg = generator.clusteringDescendantAverages[i];
+ decomposed[i] = scalar / avg;
+ scalar %= avg;
+ }
+ for (int i = limit.length - 1 ; i > 0 ; i--)
+ {
+ int avg = generator.clusteringComponentAverages[i];
+ if (decomposed[i] >= avg)
+ {
+ decomposed[i - 1] += decomposed[i] / avg;
+ decomposed[i] %= avg;
+ }
+ }
+ }
+
+ private int setNoLimit(int firstComponentCount)
+ {
+ Arrays.fill(limit, Integer.MAX_VALUE);
+ return firstComponentCount * generator.clusteringDescendantAverages[0];
+ }
+
+ private int setLimit(int position)
+ {
+ decompose(position, limit);
+ int expectedRowCount = 0;
+ for (int i = 0 ; i < limit.length ; i++)
+ {
+ int l = limit[i];
+ expectedRowCount += l * generator.clusteringDescendantAverages[i];
+ }
+ return expectedRowCount;
+ }
+
+ static enum State
+ {
+ END_OF_PARTITION, AFTER_LIMIT, SUCCESS;
+ }
+
+ // seek to the provided position (or the first entry if null)
+ private State seek(int scalar)
+ {
+ if (scalar == 0)
+ {
+ this.position[0] = -1;
+ clusteringComponents[0].addFirst(this);
+ return setHasNext(advance(0, true));
+ }
+
+ int[] position = this.position;
+ decompose(scalar, position);
+ boolean incremented = false;
+ for (int i = 0 ; i < position.length ; i++)
+ {
+ if (i != 0)
+ fill(i);
+ for (int c = position[i] ; c > 0 ; c--)
+ clusteringComponents[i].poll();
+
+ // we can have started from a position that does not exist, in which
+ // case we need to ascend back up our clustering components, advancing as we go
+ if (clusteringComponents[i].isEmpty())
+ {
+ int j = i;
+ while (--j >= 0)
+ {
+ clusteringComponents[j].poll();
+ if (!clusteringComponents[j].isEmpty())
+ break;
+ }
+
+ // if we've exhausted the whole partition, we're done
+ if (j < 0)
+ return setHasNext(false);
+
+ // we don't check here to see if we've exceeded our limit,
+ // because if we came to a non-existent position and generated a limit
+ // we want to at least find the next real position, and set it on the seed
+ // in this case we do then yield false and select a different seed to continue with
+ position[j]++;
+ Arrays.fill(position, j + 1, position.length, 0);
+ while (j < i)
+ fill(++j);
+ incremented = true;
+ }
+ if (clusteringComponents[i].isEmpty())
+ throw new IllegalStateException();
+ row.row[i] = clusteringComponents[i].peek();
+ }
+
+ if (incremented && compareToLastRow() > 0)
+ return setHasNext(false);
+
+ position[position.length - 1]--;
+ // call advance so we honour any select chance
+ clusteringComponents[position.length - 1].addFirst(this);
+
+ return setHasNext(advance(position.length - 1, true));
+ }
+
+ // normal method for moving the iterator forward; maintains the row object, and delegates to advance(int)
+ // to move the iterator to the next item
+ void advance()
+ {
+ // we are always at the leaf level when this method is invoked
+ // so we calculate the seed for generating the row by combining the seed that generated the clustering components
+ int depth = clusteringComponents.length - 1;
+ long parentSeed = clusteringSeeds[depth];
+ long rowSeed = seed(clusteringComponents[depth].peek(), generator.clusteringComponents.get(depth).type, parentSeed);
+
+ // and then fill the row with the _non-clustering_ values for the position we _were_ at, as this is what we'll deliver
+ for (int i = clusteringSeeds.length ; i < row.row.length ; i++)
+ {
+ Generator gen = generator.valueComponents.get(i - clusteringSeeds.length);
+ gen.setSeed(rowSeed);
+ row.row[i] = gen.generate();
+ }
+
+ // then we advance the leaf level
+ setHasNext(advance(depth, false));
+ }
+
+ private boolean advance(int depth, boolean first)
+ {
+ // advance the leaf component
+ clusteringComponents[depth].poll();
+ position[depth]++;
+ while (true)
+ {
+ if (clusteringComponents[depth].isEmpty())
+ {
+ // if we've run out of clustering components at this level, ascend
+ if (depth == 0)
+ return false;
+ depth--;
+ clusteringComponents[depth].poll();
+ if (++position[depth] > limit[depth])
+ return false;
+ continue;
+ }
+
+ int compareToLastRow = compareToLastRow();
+ if (compareToLastRow > 0 && !first)
+ return false;
+ boolean forceReturnOne = first && compareToLastRow >= 0;
+
+ // the chance of descending is the uniform usechance, multiplied by the number of children
+ // we would on average generate (so if we have a 0.1 use chance, but should generate 10 children
+ // then we will always descend), multiplied by 1/(compound roll), where (compound roll) is the
+ // chance with which we reached this depth, i.e. if we already beat 50/50 odds, we double our
+ // chance of beating this next roll
+ double thischance = useChance * chancemodifier[depth];
+ if (forceReturnOne || thischance > 0.99999f || thischance >= random.nextDouble())
+ {
+ // if we're descending, we fill in our clustering component and increase our depth
+ row.row[depth] = clusteringComponents[depth].peek();
+ depth++;
+ if (depth == clusteringComponents.length)
+ return true;
+ // if we haven't reached the leaf, we update our probability statistics, fill in all of
+ // this level's clustering components, and repeat
+ if (useChance < 1d)
+ {
+ rollmodifier[depth] = rollmodifier[depth - 1] / Math.min(1d, thischance);
+ chancemodifier[depth] = generator.clusteringDescendantAverages[depth] * rollmodifier[depth];
+ }
+ position[depth] = 0;
+ fill(depth);
+ continue;
+ }
+
+ if (compareToLastRow >= 0)
+ return false;
+
+ // if we don't descend, we remove the clustering suffix we've skipped and continue
+ clusteringComponents[depth].poll();
+ position[depth]++;
+ }
+ }
+
+ private static int compare(int[] a, int[] b)
+ {
+ for (int i = 0 ; i != a.length ; i++)
+ if (a[i] != b[i])
+ return Integer.compare(a[i], b[i]);
+ return 0;
+ }
+
+ private int compareToLastRow()
+ {
+ int c = position.length - 1;
+ for (int i = 0 ; i <= c ; i++)
+ {
+ int p = position[i], l = limit[i], r = clusteringComponents[i].size();
+ if (i == c && p == l - 1)
+ return 0;
+ if ((p < l) & (r > 1))
+ return -1;
+ if (p > l)
+ return 1;
+ }
+ return 1;
+ }
+
+ // generate the clustering components for the provided depth; requires preceding components
+ // to have been generated and their seeds populated into clusteringSeeds
+ void fill(int depth)
+ {
+ long seed = clusteringSeeds[depth - 1];
+ Generator gen = generator.clusteringComponents.get(depth);
+ gen.setSeed(seed);
+ clusteringSeeds[depth] = seed(clusteringComponents[depth - 1].peek(), generator.clusteringComponents.get(depth - 1).type, seed);
+ fill(clusteringComponents[depth], (int) gen.clusteringDistribution.next(), gen);
+ }
+
+ // generate the clustering components into the queue
+ void fill(Queue<Object> queue, int count, Generator generator)
+ {
+ if (count == 1)
+ {
+ queue.add(generator.generate());
+ return;
+ }
+
+ switch (this.generator.order)
+ {
+ case SORTED:
+ if (Comparable.class.isAssignableFrom(generator.clazz))
+ {
+ tosort.clear();
+ for (int i = 0 ; i < count ; i++)
+ tosort.add(generator.generate());
+ Collections.sort((List<Comparable>) (List<?>) tosort);
+ for (int i = 0 ; i < count ; i++)
+ if (i == 0 || ((Comparable) tosort.get(i - 1)).compareTo(i) < 0)
+ queue.add(tosort.get(i));
+ break;
+ }
+ case ARBITRARY:
+ unique.clear();
+ for (int i = 0 ; i < count ; i++)
+ {
+ Object next = generator.generate();
+ if (unique.add(next))
+ queue.add(next);
+ }
+ break;
+ case SHUFFLED:
+ unique.clear();
+ tosort.clear();
+ ThreadLocalRandom rand = ThreadLocalRandom.current();
+ for (int i = 0 ; i < count ; i++)
+ {
+ Object next = generator.generate();
+ if (unique.add(next))
+ tosort.add(next);
+ }
+ for (int i = 0 ; i < tosort.size() ; i++)
+ {
+ int index = rand.nextInt(i, tosort.size());
+ Object obj = tosort.get(index);
+ tosort.set(index, tosort.get(i));
+ queue.add(obj);
+ }
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ public boolean hasNext()
+ {
+ return hasNext;
+ }
+
+ public Row next()
+ {
+ if (!hasNext())
+ throw new NoSuchElementException();
+ advance();
+ return row;
+ }
+
+ public boolean finishedPartition()
+ {
+ return clusteringComponents[0].isEmpty();
+ }
+
+ private State setHasNext(boolean hasNext)
+ {
+ if (!hasNext)
+ {
+ this.hasNext = false;
+ boolean isLast = finishedPartition();
+ if (isWrite)
+ {
+ boolean isFirst = isFirstWrite;
+ if (isFirst)
+ seedManager.markFirstWrite(seed, isLast);
+ if (isLast)
+ seedManager.markLastWrite(seed, isFirst);
+ }
+ return isLast ? State.END_OF_PARTITION : State.AFTER_LIMIT;
+ }
+ this.hasNext = hasNext;
+ return State.SUCCESS;
+ }
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ // calculate a new seed based on the combination of a parent seed and the generated child, to generate
+ // any children of this child
+ static long seed(Object object, AbstractType type, long seed)
+ {
+ if (object instanceof ByteBuffer)
+ {
+ ByteBuffer buf = (ByteBuffer) object;
+ for (int i = buf.position() ; i < buf.limit() ; i++)
+ seed = (31 * seed) + buf.get(i);
+ return seed;
+ }
+ else if (object instanceof String)
+ {
+ String str = (String) object;
+ for (int i = 0 ; i < str.length() ; i++)
+ seed = (31 * seed) + str.charAt(i);
+ return seed;
+ }
+ else if (object instanceof Number)
+ {
+ return (seed * 31) + ((Number) object).longValue();
+ }
+ else if (object instanceof UUID)
+ {
+ return seed * 31 + (((UUID) object).getLeastSignificantBits() ^ ((UUID) object).getMostSignificantBits());
+ }
+ else
+ {
+ return seed(type.decompose(object), BytesType.instance, seed);
+ }
+ }
+
+ public Object getPartitionKey(int i)
+ {
+ return partitionKey[i];
+ }
+
+ public String getKeyAsString()
+ {
+ StringBuilder sb = new StringBuilder();
+ int i = 0;
+ for (Object key : partitionKey)
+ {
+ if (i > 0)
+ sb.append("|");
+ AbstractType type = generator.partitionKey.get(i++).type;
+ sb.append(type.getString(type.decompose(key)));
+ }
+ return sb.toString();
+ }
+
+ // used for thrift smart routing - if it's a multi-part key we don't try to route correctly right now
+ public ByteBuffer getToken()
+ {
+ return generator.partitionKey.get(0).type.decompose(partitionKey[0]);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/597a1d5d/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
index 9e1a5e8..9713e93 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
@@ -32,6 +32,7 @@ import org.apache.commons.math3.util.Pair;
import org.apache.cassandra.stress.Operation;
import org.apache.cassandra.stress.generate.DistributionFactory;
import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.SeedManager;
import org.apache.cassandra.stress.util.Timer;
public abstract class SampledOpDistributionFactory<T> implements OpDistributionFactory
[2/3] cassandra git commit: cassandra-stress simultaneous inserts
over same seed (take two)
Posted by be...@apache.org.
cassandra-stress simultaneous inserts over same seed
(take two)
patch by benedict; reviewed by rstupp CASSANDRA-7964
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/597a1d5d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/597a1d5d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/597a1d5d
Branch: refs/heads/trunk
Commit: 597a1d5db27ef9e37f7066868e76cc9450fc3c9c
Parents: 51f7cad
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Dec 12 15:07:15 2014 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Dec 12 15:07:15 2014 +0000
----------------------------------------------------------------------
.../apache/cassandra/stress/WorkManager.java | 65 ++
.../stress/generate/PartitionIterator.java | 632 +++++++++++++++++++
.../SampledOpDistributionFactory.java | 1 +
3 files changed, 698 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/597a1d5d/tools/stress/src/org/apache/cassandra/stress/WorkManager.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/WorkManager.java b/tools/stress/src/org/apache/cassandra/stress/WorkManager.java
new file mode 100644
index 0000000..c6a3eee
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/WorkManager.java
@@ -0,0 +1,65 @@
+package org.apache.cassandra.stress;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+interface WorkManager
+{
+ // -1 indicates consumer should terminate
+ int takePermits(int count);
+
+ // signal all consumers to terminate
+ void stop();
+
+ static final class FixedWorkManager implements WorkManager
+ {
+
+ final AtomicLong permits;
+
+ public FixedWorkManager(long permits)
+ {
+ this.permits = new AtomicLong(permits);
+ }
+
+ @Override
+ public int takePermits(int count)
+ {
+ while (true)
+ {
+ long cur = permits.get();
+ if (cur == 0)
+ return -1;
+ count = (int) Math.min(count, cur);
+ long next = cur - count;
+ if (permits.compareAndSet(cur, next))
+ return count;
+ }
+ }
+
+ @Override
+ public void stop()
+ {
+ permits.getAndSet(0);
+ }
+ }
+
+ static final class ContinuousWorkManager implements WorkManager
+ {
+
+ volatile boolean stop = false;
+
+ @Override
+ public int takePermits(int count)
+ {
+ if (stop)
+ return -1;
+ return count;
+ }
+
+ @Override
+ public void stop()
+ {
+ stop = true;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/597a1d5d/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
new file mode 100644
index 0000000..baab867
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
@@ -0,0 +1,632 @@
+package org.apache.cassandra.stress.generate;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.generate.values.Generator;
+
+// a partition is re-used to reduce garbage generation, as is its internal RowIterator
+// TODO: we should batch the generation of clustering components so we can bound the time and size necessary to
+// generate huge partitions with only a small number of clustering components; i.e. we should generate seeds for batches
+// of a single component, and then generate the values within those batches as necessary. this will be difficult with
+// generating sorted partitions, and may require generator support (e.g. we may need to support generating prefixes
+// that are extended/suffixed to generate each batch, so that we can sort the prefixes)
+public abstract class PartitionIterator implements Iterator<Row>
+{
+
+ // we reuse the row object to save garbage
+ abstract boolean reset(double useChance, int targetCount, Operation op);
+
+ long idseed;
+ Seed seed;
+ final Object[] partitionKey;
+ final PartitionGenerator generator;
+ final SeedManager seedManager;
+ final Row row;
+
+ public static PartitionIterator get(PartitionGenerator generator, SeedManager seedManager)
+ {
+ if (generator.clusteringComponents.size() > 0)
+ return new MultiRowIterator(generator, seedManager);
+ else
+ return new SingleRowIterator(generator, seedManager);
+ }
+
+ private PartitionIterator(PartitionGenerator generator, SeedManager seedManager)
+ {
+ this.generator = generator;
+ this.seedManager = seedManager;
+ this.partitionKey = new Object[generator.partitionKey.size()];
+ this.row = new Row(partitionKey, new Object[generator.clusteringComponents.size() + generator.valueComponents.size()]);
+ }
+
+ private void setSeed(Seed seed)
+ {
+ long idseed = 0;
+ for (int i = 0 ; i < partitionKey.length ; i++)
+ {
+ Generator generator = this.generator.partitionKey.get(i);
+ // set the partition key seed based on the current work item we're processing
+ generator.setSeed(seed.seed);
+ Object key = generator.generate();
+ partitionKey[i] = key;
+ // then contribute this value to the data seed
+ idseed = seed(key, generator.type, idseed);
+ }
+ this.seed = seed;
+ this.idseed = idseed;
+ }
+
+ public boolean reset(Seed seed, double useChance, Operation op)
+ {
+ setSeed(seed);
+ return reset(useChance, 0, op);
+ }
+
+ public boolean reset(Seed seed, int targetCount, Operation op)
+ {
+ setSeed(seed);
+ return reset(Double.NaN, targetCount, op);
+ }
+
+ static class SingleRowIterator extends PartitionIterator
+ {
+ boolean done;
+ boolean isWrite;
+
+ private SingleRowIterator(PartitionGenerator generator, SeedManager seedManager)
+ {
+ super(generator, seedManager);
+ }
+
+ boolean reset(double useChance, int targetCount, Operation op)
+ {
+ done = false;
+ isWrite = op.isWrite();
+ return true;
+ }
+
+ public boolean hasNext()
+ {
+ return !done;
+ }
+
+ public Row next()
+ {
+ if (done)
+ throw new NoSuchElementException();
+ for (int i = 0 ; i < row.row.length ; i++)
+ {
+ Generator gen = generator.valueComponents.get(i);
+ gen.setSeed(idseed);
+ row.row[i] = gen.generate();
+ }
+ done = true;
+ if (isWrite)
+ {
+ seedManager.markFirstWrite(seed, true);
+ seedManager.markLastWrite(seed, true);
+ }
+ return row;
+ }
+ }
+
+ // permits iterating a random subset of the procedurally generated rows in this partition. this is the only mechanism for visiting rows.
+ // we maintain a stack of clustering components and their seeds; for each clustering component we visit, we generate all values it takes at that level,
+ // and then, using the average (total) number of children it takes we randomly choose whether or not we visit its children;
+ // if we do, we generate all possible values the immediate children can take, and repeat the process. So at any one time we are using space proportional
+ // to C.N, where N is the average number of values each clustering component takes, as opposed to N^C total values in the partition.
+ // TODO : support first/last row, and constraining reads to rows we know are populated
+ static class MultiRowIterator extends PartitionIterator
+ {
+
+ // probability any single row will be generated in this iteration
+ double useChance;
+
+ // the seed used to generate the current values for the clustering components at each depth;
+ // used to save recalculating it for each row, so we only need to recalc from prior row.
+ final long[] clusteringSeeds = new long[generator.clusteringComponents.size()];
+ // the components remaining to be visited for each level of the current stack
+ final Deque<Object>[] clusteringComponents = new ArrayDeque[generator.clusteringComponents.size()];
+
+ // we want our chance of selection to be applied uniformly, so we compound the roll we make at each level
+ // so that we know with what chance we reached there, and we adjust our roll at that level by that amount
+ final double[] chancemodifier = new double[generator.clusteringComponents.size()];
+ final double[] rollmodifier = new double[generator.clusteringComponents.size()];
+ final ThreadLocalRandom random = ThreadLocalRandom.current();
+
+ // track where in the partition we are, and where we are limited to
+ final int[] position = new int[generator.clusteringComponents.size()];
+ final int[] limit = new int[position.length];
+ boolean hasNext, isFirstWrite, isWrite;
+
+ // reusable collections for generating unique and sorted clustering components
+ final Set<Object> unique = new HashSet<>();
+ final List<Object> tosort = new ArrayList<>();
+
+ MultiRowIterator(PartitionGenerator generator, SeedManager seedManager)
+ {
+ super(generator, seedManager);
+ for (int i = 0 ; i < clusteringComponents.length ; i++)
+ clusteringComponents[i] = new ArrayDeque<>();
+ rollmodifier[0] = 1f;
+ chancemodifier[0] = generator.clusteringDescendantAverages[0];
+ }
+
+ // if we're a write, the expected behaviour is that the requested batch count is compounded with the seed's visit
+ // count to decide how much we should return in one iteration
+ boolean reset(double useChance, int targetCount, Operation op)
+ {
+ if (this.useChance < 1d)
+ {
+ // we clear our prior roll-modifiers if the use chance was previously less-than zero
+ Arrays.fill(rollmodifier, 1d);
+ Arrays.fill(chancemodifier, 1d);
+ }
+
+ // set the seed for the first clustering component
+ generator.clusteringComponents.get(0).setSeed(idseed);
+
+ // calculate how many first clustering components we'll generate, and how many total rows this predicts
+ int firstComponentCount = (int) generator.clusteringComponents.get(0).clusteringDistribution.next();
+ int expectedRowCount;
+
+ int position = seed.position();
+ isWrite = op.isWrite();
+
+ if (isWrite)
+ expectedRowCount = firstComponentCount * generator.clusteringDescendantAverages[0];
+ else if (position != 0)
+ expectedRowCount = setLimit(position);
+ else
+ expectedRowCount = setNoLimit(firstComponentCount);
+
+ if (Double.isNaN(useChance))
+ useChance = Math.max(0d, Math.min(1d, targetCount / (double) expectedRowCount));
+ this.useChance = useChance;
+
+ while (true)
+ {
+ // TODO: we could avoid repopulating these each loop, by tracking our prior position
+ for (Queue<?> q : clusteringComponents)
+ q.clear();
+ clusteringSeeds[0] = idseed;
+ fill(clusteringComponents[0], firstComponentCount, generator.clusteringComponents.get(0));
+
+ // we loop in case we have picked an entirely non-existent range, in which case
+ // we will reset the seed's position, then try again (until we exhaust it or find
+ // some real range) - this only happens for writes, so we only keep this logic in the loop
+
+ if (isWrite)
+ {
+ position = seed.moveForwards(Math.max(1, expectedRowCount / seed.visits));
+ isFirstWrite = position == 0;
+ }
+
+ // seek to our start position
+ switch (seek(isWrite ? position : null))
+ {
+ case END_OF_PARTITION:
+ return false;
+ case SUCCESS:
+ return true;
+ }
+
+ if (!isWrite)
+ throw new IllegalStateException();
+
+ // TODO: recompose our real position into the nearest scalar position, and ensure the seed position is >= this
+ }
+ }
+
+ private void decompose(int scalar, int[] decomposed)
+ {
+ for (int i = 0 ; i < decomposed.length ; i++)
+ {
+ int avg = generator.clusteringDescendantAverages[i];
+ decomposed[i] = scalar / avg;
+ scalar %= avg;
+ }
+ for (int i = limit.length - 1 ; i > 0 ; i--)
+ {
+ int avg = generator.clusteringComponentAverages[i];
+ if (decomposed[i] >= avg)
+ {
+ decomposed[i - 1] += decomposed[i] / avg;
+ decomposed[i] %= avg;
+ }
+ }
+ }
+
+ private int setNoLimit(int firstComponentCount)
+ {
+ Arrays.fill(limit, Integer.MAX_VALUE);
+ return firstComponentCount * generator.clusteringDescendantAverages[0];
+ }
+
+ private int setLimit(int position)
+ {
+ decompose(position, limit);
+ int expectedRowCount = 0;
+ for (int i = 0 ; i < limit.length ; i++)
+ {
+ int l = limit[i];
+ expectedRowCount += l * generator.clusteringDescendantAverages[i];
+ }
+ return expectedRowCount;
+ }
+
+ static enum State
+ {
+ END_OF_PARTITION, AFTER_LIMIT, SUCCESS;
+ }
+
+ // seek to the provided position (or the first entry if null)
+ private State seek(int scalar)
+ {
+ if (scalar == 0)
+ {
+ this.position[0] = -1;
+ clusteringComponents[0].addFirst(this);
+ return setHasNext(advance(0, true));
+ }
+
+ int[] position = this.position;
+ decompose(scalar, position);
+ boolean incremented = false;
+ for (int i = 0 ; i < position.length ; i++)
+ {
+ if (i != 0)
+ fill(i);
+ for (int c = position[i] ; c > 0 ; c--)
+ clusteringComponents[i].poll();
+
+ // we can have started from a position that does not exist, in which
+ // case we need to ascend back up our clustering components, advancing as we go
+ if (clusteringComponents[i].isEmpty())
+ {
+ int j = i;
+ while (--j >= 0)
+ {
+ clusteringComponents[j].poll();
+ if (!clusteringComponents[j].isEmpty())
+ break;
+ }
+
+ // if we've exhausted the whole partition, we're done
+ if (j < 0)
+ return setHasNext(false);
+
+ // we don't check here to see if we've exceeded our limit,
+ // because if we came to a non-existent position and generated a limit
+ // we want to at least find the next real position, and set it on the seed
+ // in this case we do then yield false and select a different seed to continue with
+ position[j]++;
+ Arrays.fill(position, j + 1, position.length, 0);
+ while (j < i)
+ fill(++j);
+ incremented = true;
+ }
+ if (clusteringComponents[i].isEmpty())
+ throw new IllegalStateException();
+ row.row[i] = clusteringComponents[i].peek();
+ }
+
+ if (incremented && compareToLastRow() > 0)
+ return setHasNext(false);
+
+ position[position.length - 1]--;
+ // call advance so we honour any select chance
+ clusteringComponents[position.length - 1].addFirst(this);
+
+ return setHasNext(advance(position.length - 1, true));
+ }
+
+ // normal method for moving the iterator forward; maintains the row object, and delegates to advance(int)
+ // to move the iterator to the next item
+ void advance()
+ {
+ // we are always at the leaf level when this method is invoked
+ // so we calculate the seed for generating the row by combining the seed that generated the clustering components
+ int depth = clusteringComponents.length - 1;
+ long parentSeed = clusteringSeeds[depth];
+ long rowSeed = seed(clusteringComponents[depth].peek(), generator.clusteringComponents.get(depth).type, parentSeed);
+
+ // and then fill the row with the _non-clustering_ values for the position we _were_ at, as this is what we'll deliver
+ for (int i = clusteringSeeds.length ; i < row.row.length ; i++)
+ {
+ Generator gen = generator.valueComponents.get(i - clusteringSeeds.length);
+ gen.setSeed(rowSeed);
+ row.row[i] = gen.generate();
+ }
+
+ // then we advance the leaf level
+ setHasNext(advance(depth, false));
+ }
+
+ private boolean advance(int depth, boolean first)
+ {
+ // advance the leaf component
+ clusteringComponents[depth].poll();
+ position[depth]++;
+ while (true)
+ {
+ if (clusteringComponents[depth].isEmpty())
+ {
+ // if we've run out of clustering components at this level, ascend
+ if (depth == 0)
+ return false;
+ depth--;
+ clusteringComponents[depth].poll();
+ if (++position[depth] > limit[depth])
+ return false;
+ continue;
+ }
+
+ int compareToLastRow = compareToLastRow();
+ if (compareToLastRow > 0 && !first)
+ return false;
+ boolean forceReturnOne = first && compareToLastRow >= 0;
+
+ // the chance of descending is the uniform usechance, multiplied by the number of children
+ // we would on average generate (so if we have a 0.1 use chance, but should generate 10 children
+ // then we will always descend), multiplied by 1/(compound roll), where (compound roll) is the
+ // chance with which we reached this depth, i.e. if we already beat 50/50 odds, we double our
+ // chance of beating this next roll
+ double thischance = useChance * chancemodifier[depth];
+ if (forceReturnOne || thischance > 0.99999f || thischance >= random.nextDouble())
+ {
+ // if we're descending, we fill in our clustering component and increase our depth
+ row.row[depth] = clusteringComponents[depth].peek();
+ depth++;
+ if (depth == clusteringComponents.length)
+ return true;
+ // if we haven't reached the leaf, we update our probability statistics, fill in all of
+ // this level's clustering components, and repeat
+ if (useChance < 1d)
+ {
+ rollmodifier[depth] = rollmodifier[depth - 1] / Math.min(1d, thischance);
+ chancemodifier[depth] = generator.clusteringDescendantAverages[depth] * rollmodifier[depth];
+ }
+ position[depth] = 0;
+ fill(depth);
+ continue;
+ }
+
+ if (compareToLastRow >= 0)
+ return false;
+
+ // if we don't descend, we remove the clustering suffix we've skipped and continue
+ clusteringComponents[depth].poll();
+ position[depth]++;
+ }
+ }
+
+ private static int compare(int[] a, int[] b)
+ {
+ for (int i = 0 ; i != a.length ; i++)
+ if (a[i] != b[i])
+ return Integer.compare(a[i], b[i]);
+ return 0;
+ }
+
+ private int compareToLastRow()
+ {
+ int c = position.length - 1;
+ for (int i = 0 ; i <= c ; i++)
+ {
+ int p = position[i], l = limit[i], r = clusteringComponents[i].size();
+ if (i == c && p == l - 1)
+ return 0;
+ if ((p < l) & (r > 1))
+ return -1;
+ if (p > l)
+ return 1;
+ }
+ return 1;
+ }
+
+ // generate the clustering components for the provided depth; requires preceding components
+ // to have been generated and their seeds populated into clusteringSeeds
+ void fill(int depth)
+ {
+ long seed = clusteringSeeds[depth - 1];
+ Generator gen = generator.clusteringComponents.get(depth);
+ gen.setSeed(seed);
+ clusteringSeeds[depth] = seed(clusteringComponents[depth - 1].peek(), generator.clusteringComponents.get(depth - 1).type, seed);
+ fill(clusteringComponents[depth], (int) gen.clusteringDistribution.next(), gen);
+ }
+
+ // generate the clustering components into the queue
+ void fill(Queue<Object> queue, int count, Generator generator)
+ {
+ if (count == 1)
+ {
+ queue.add(generator.generate());
+ return;
+ }
+
+ switch (this.generator.order)
+ {
+ case SORTED:
+ if (Comparable.class.isAssignableFrom(generator.clazz))
+ {
+ tosort.clear();
+ for (int i = 0 ; i < count ; i++)
+ tosort.add(generator.generate());
+ Collections.sort((List<Comparable>) (List<?>) tosort);
+ for (int i = 0 ; i < count ; i++)
+ if (i == 0 || ((Comparable) tosort.get(i - 1)).compareTo(i) < 0)
+ queue.add(tosort.get(i));
+ break;
+ }
+ case ARBITRARY:
+ unique.clear();
+ for (int i = 0 ; i < count ; i++)
+ {
+ Object next = generator.generate();
+ if (unique.add(next))
+ queue.add(next);
+ }
+ break;
+ case SHUFFLED:
+ unique.clear();
+ tosort.clear();
+ ThreadLocalRandom rand = ThreadLocalRandom.current();
+ for (int i = 0 ; i < count ; i++)
+ {
+ Object next = generator.generate();
+ if (unique.add(next))
+ tosort.add(next);
+ }
+ for (int i = 0 ; i < tosort.size() ; i++)
+ {
+ int index = rand.nextInt(i, tosort.size());
+ Object obj = tosort.get(index);
+ tosort.set(index, tosort.get(i));
+ queue.add(obj);
+ }
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ public boolean hasNext()
+ {
+ return hasNext;
+ }
+
+ public Row next()
+ {
+ if (!hasNext())
+ throw new NoSuchElementException();
+ advance();
+ return row;
+ }
+
+ public boolean finishedPartition()
+ {
+ return clusteringComponents[0].isEmpty();
+ }
+
+ private State setHasNext(boolean hasNext)
+ {
+ if (!hasNext)
+ {
+ this.hasNext = false;
+ boolean isLast = finishedPartition();
+ if (isWrite)
+ {
+ boolean isFirst = isFirstWrite;
+ if (isFirst)
+ seedManager.markFirstWrite(seed, isLast);
+ if (isLast)
+ seedManager.markLastWrite(seed, isFirst);
+ }
+ return isLast ? State.END_OF_PARTITION : State.AFTER_LIMIT;
+ }
+ this.hasNext = hasNext;
+ return State.SUCCESS;
+ }
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ // calculate a new seed based on the combination of a parent seed and the generated child, to generate
+ // any children of this child
+ static long seed(Object object, AbstractType type, long seed)
+ {
+ if (object instanceof ByteBuffer)
+ {
+ ByteBuffer buf = (ByteBuffer) object;
+ for (int i = buf.position() ; i < buf.limit() ; i++)
+ seed = (31 * seed) + buf.get(i);
+ return seed;
+ }
+ else if (object instanceof String)
+ {
+ String str = (String) object;
+ for (int i = 0 ; i < str.length() ; i++)
+ seed = (31 * seed) + str.charAt(i);
+ return seed;
+ }
+ else if (object instanceof Number)
+ {
+ return (seed * 31) + ((Number) object).longValue();
+ }
+ else if (object instanceof UUID)
+ {
+ return seed * 31 + (((UUID) object).getLeastSignificantBits() ^ ((UUID) object).getMostSignificantBits());
+ }
+ else
+ {
+ return seed(type.decompose(object), BytesType.instance, seed);
+ }
+ }
+
+ public Object getPartitionKey(int i)
+ {
+ return partitionKey[i];
+ }
+
+ public String getKeyAsString()
+ {
+ StringBuilder sb = new StringBuilder();
+ int i = 0;
+ for (Object key : partitionKey)
+ {
+ if (i > 0)
+ sb.append("|");
+ AbstractType type = generator.partitionKey.get(i++).type;
+ sb.append(type.getString(type.decompose(key)));
+ }
+ return sb.toString();
+ }
+
+ // used for thrift smart routing - if it's a multi-part key we don't try to route correctly right now
+ public ByteBuffer getToken()
+ {
+ return generator.partitionKey.get(0).type.decompose(partitionKey[0]);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/597a1d5d/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
index 9e1a5e8..9713e93 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
@@ -32,6 +32,7 @@ import org.apache.commons.math3.util.Pair;
import org.apache.cassandra.stress.Operation;
import org.apache.cassandra.stress.generate.DistributionFactory;
import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.SeedManager;
import org.apache.cassandra.stress.util.Timer;
public abstract class SampledOpDistributionFactory<T> implements OpDistributionFactory
[3/3] cassandra git commit: Merge branch 'cassandra-2.1' into trunk
Posted by be...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5876d934
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5876d934
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5876d934
Branch: refs/heads/trunk
Commit: 5876d93424ff49bba9db2de19442992c4dfe641d
Parents: b5795ef 597a1d5
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Dec 12 15:07:34 2014 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Dec 12 15:07:34 2014 +0000
----------------------------------------------------------------------
.../apache/cassandra/stress/WorkManager.java | 65 ++
.../stress/generate/PartitionIterator.java | 632 +++++++++++++++++++
.../SampledOpDistributionFactory.java | 1 +
3 files changed, 698 insertions(+)
----------------------------------------------------------------------