You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/12/12 16:08:08 UTC

[1/3] cassandra git commit: cassandra-stress simultaneous inserts over same seed (take two)

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 51f7cad48 -> 597a1d5db
  refs/heads/trunk b5795ef96 -> 5876d9342


cassandra-stress simultaneous inserts over same seed
(take two)

patch by benedict; reviewed by rstupp CASSANDRA-7964


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/597a1d5d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/597a1d5d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/597a1d5d

Branch: refs/heads/cassandra-2.1
Commit: 597a1d5db27ef9e37f7066868e76cc9450fc3c9c
Parents: 51f7cad
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Dec 12 15:07:15 2014 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Dec 12 15:07:15 2014 +0000

----------------------------------------------------------------------
 .../apache/cassandra/stress/WorkManager.java    |  65 ++
 .../stress/generate/PartitionIterator.java      | 632 +++++++++++++++++++
 .../SampledOpDistributionFactory.java           |   1 +
 3 files changed, 698 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/597a1d5d/tools/stress/src/org/apache/cassandra/stress/WorkManager.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/WorkManager.java b/tools/stress/src/org/apache/cassandra/stress/WorkManager.java
new file mode 100644
index 0000000..c6a3eee
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/WorkManager.java
@@ -0,0 +1,65 @@
+package org.apache.cassandra.stress;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+interface WorkManager
+{
+    // -1 indicates consumer should terminate
+    int takePermits(int count);
+
+    // signal all consumers to terminate
+    void stop();
+
+    static final class FixedWorkManager implements WorkManager
+    {
+
+        final AtomicLong permits;
+
+        public FixedWorkManager(long permits)
+        {
+            this.permits = new AtomicLong(permits);
+        }
+
+        @Override
+        public int takePermits(int count)
+        {
+            while (true)
+            {
+                long cur = permits.get();
+                if (cur == 0)
+                    return -1;
+                count = (int) Math.min(count, cur);
+                long next = cur - count;
+                if (permits.compareAndSet(cur, next))
+                    return count;
+            }
+        }
+
+        @Override
+        public void stop()
+        {
+            permits.getAndSet(0);
+        }
+    }
+
+    static final class ContinuousWorkManager implements WorkManager
+    {
+
+        volatile boolean stop = false;
+
+        @Override
+        public int takePermits(int count)
+        {
+            if (stop)
+                return -1;
+            return count;
+        }
+
+        @Override
+        public void stop()
+        {
+            stop = true;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/597a1d5d/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
new file mode 100644
index 0000000..baab867
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
@@ -0,0 +1,632 @@
+package org.apache.cassandra.stress.generate;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.generate.values.Generator;
+
+// a partition is re-used to reduce garbage generation, as is its internal RowIterator
+// TODO: we should batch the generation of clustering components so we can bound the time and size necessary to
+// generate huge partitions with only a small number of clustering components; i.e. we should generate seeds for batches
+// of a single component, and then generate the values within those batches as necessary. this will be difficult with
+// generating sorted partitions, and may require generator support (e.g. we may need to support generating prefixes
+// that are extended/suffixed to generate each batch, so that we can sort the prefixes)
+public abstract class PartitionIterator implements Iterator<Row>
+{
+
+    // we reuse the row object to save garbage
+    abstract boolean reset(double useChance, int targetCount, Operation op);
+
+    long idseed;
+    Seed seed;
+    final Object[] partitionKey;
+    final PartitionGenerator generator;
+    final SeedManager seedManager;
+    final Row row;
+
+    public static PartitionIterator get(PartitionGenerator generator, SeedManager seedManager)
+    {
+        if (generator.clusteringComponents.size() > 0)
+            return new MultiRowIterator(generator, seedManager);
+        else
+            return new SingleRowIterator(generator, seedManager);
+    }
+
+    private PartitionIterator(PartitionGenerator generator, SeedManager seedManager)
+    {
+        this.generator = generator;
+        this.seedManager = seedManager;
+        this.partitionKey = new Object[generator.partitionKey.size()];
+        this.row = new Row(partitionKey, new Object[generator.clusteringComponents.size() + generator.valueComponents.size()]);
+    }
+
+    private void setSeed(Seed seed)
+    {
+        long idseed = 0;
+        for (int i = 0 ; i < partitionKey.length ; i++)
+        {
+            Generator generator = this.generator.partitionKey.get(i);
+            // set the partition key seed based on the current work item we're processing
+            generator.setSeed(seed.seed);
+            Object key = generator.generate();
+            partitionKey[i] = key;
+            // then contribute this value to the data seed
+            idseed = seed(key, generator.type, idseed);
+        }
+        this.seed = seed;
+        this.idseed = idseed;
+    }
+
+    public boolean reset(Seed seed, double useChance, Operation op)
+    {
+        setSeed(seed);
+        return reset(useChance, 0, op);
+    }
+
+    public boolean reset(Seed seed, int targetCount, Operation op)
+    {
+        setSeed(seed);
+        return reset(Double.NaN, targetCount, op);
+    }
+
+    static class SingleRowIterator extends PartitionIterator
+    {
+        boolean done;
+        boolean isWrite;
+
+        private SingleRowIterator(PartitionGenerator generator, SeedManager seedManager)
+        {
+            super(generator, seedManager);
+        }
+
+        boolean reset(double useChance, int targetCount, Operation op)
+        {
+            done = false;
+            isWrite = op.isWrite();
+            return true;
+        }
+
+        public boolean hasNext()
+        {
+            return !done;
+        }
+
+        public Row next()
+        {
+            if (done)
+                throw new NoSuchElementException();
+            for (int i = 0 ; i < row.row.length ; i++)
+            {
+                Generator gen = generator.valueComponents.get(i);
+                gen.setSeed(idseed);
+                row.row[i] = gen.generate();
+            }
+            done = true;
+            if (isWrite)
+            {
+                seedManager.markFirstWrite(seed, true);
+                seedManager.markLastWrite(seed, true);
+            }
+            return row;
+        }
+    }
+
+    // permits iterating a random subset of the procedurally generated rows in this partition. this is the only mechanism for visiting rows.
+    // we maintain a stack of clustering components and their seeds; for each clustering component we visit, we generate all values it takes at that level,
+    // and then, using the average (total) number of children it takes we randomly choose whether or not we visit its children;
+    // if we do, we generate all possible values the immediate children can take, and repeat the process. So at any one time we are using space proportional
+    // to C.N, where N is the average number of values each clustering component takes, as opposed to N^C total values in the partition.
+    // TODO : support first/last row, and constraining reads to rows we know are populated
+    static class MultiRowIterator extends PartitionIterator
+    {
+
+        // probability any single row will be generated in this iteration
+        double useChance;
+
+        // the seed used to generate the current values for the clustering components at each depth;
+        // used to save recalculating it for each row, so we only need to recalc from prior row.
+        final long[] clusteringSeeds = new long[generator.clusteringComponents.size()];
+        // the components remaining to be visited for each level of the current stack
+        final Deque<Object>[] clusteringComponents = new ArrayDeque[generator.clusteringComponents.size()];
+
+        // we want our chance of selection to be applied uniformly, so we compound the roll we make at each level
+        // so that we know with what chance we reached there, and we adjust our roll at that level by that amount
+        final double[] chancemodifier = new double[generator.clusteringComponents.size()];
+        final double[] rollmodifier = new double[generator.clusteringComponents.size()];
+        final ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        // track where in the partition we are, and where we are limited to
+        final int[] position = new int[generator.clusteringComponents.size()];
+        final int[] limit = new int[position.length];
+        boolean hasNext, isFirstWrite, isWrite;
+
+        // reusable collections for generating unique and sorted clustering components
+        final Set<Object> unique = new HashSet<>();
+        final List<Object> tosort = new ArrayList<>();
+
+        MultiRowIterator(PartitionGenerator generator, SeedManager seedManager)
+        {
+            super(generator, seedManager);
+            for (int i = 0 ; i < clusteringComponents.length ; i++)
+                clusteringComponents[i] = new ArrayDeque<>();
+            rollmodifier[0] = 1f;
+            chancemodifier[0] = generator.clusteringDescendantAverages[0];
+        }
+
+        // if we're a write, the expected behaviour is that the requested batch count is compounded with the seed's visit
+        // count to decide how much we should return in one iteration
+        boolean reset(double useChance, int targetCount, Operation op)
+        {
+            if (this.useChance < 1d)
+            {
+                // we clear our prior roll-modifiers if the use chance was previously less-than zero
+                Arrays.fill(rollmodifier, 1d);
+                Arrays.fill(chancemodifier, 1d);
+            }
+
+            // set the seed for the first clustering component
+            generator.clusteringComponents.get(0).setSeed(idseed);
+
+            // calculate how many first clustering components we'll generate, and how many total rows this predicts
+            int firstComponentCount = (int) generator.clusteringComponents.get(0).clusteringDistribution.next();
+            int expectedRowCount;
+
+            int position = seed.position();
+            isWrite = op.isWrite();
+
+            if (isWrite)
+                expectedRowCount = firstComponentCount * generator.clusteringDescendantAverages[0];
+            else if (position != 0)
+                expectedRowCount = setLimit(position);
+            else
+                expectedRowCount = setNoLimit(firstComponentCount);
+
+            if (Double.isNaN(useChance))
+                useChance = Math.max(0d, Math.min(1d, targetCount / (double) expectedRowCount));
+            this.useChance = useChance;
+
+            while (true)
+            {
+                // TODO: we could avoid repopulating these each loop, by tracking our prior position
+                for (Queue<?> q : clusteringComponents)
+                    q.clear();
+                clusteringSeeds[0] = idseed;
+                fill(clusteringComponents[0], firstComponentCount, generator.clusteringComponents.get(0));
+
+                // we loop in case we have picked an entirely non-existent range, in which case
+                // we will reset the seed's position, then try again (until we exhaust it or find
+                // some real range) - this only happens for writes, so we only keep this logic in the loop
+
+                if (isWrite)
+                {
+                    position = seed.moveForwards(Math.max(1, expectedRowCount / seed.visits));
+                    isFirstWrite = position == 0;
+                }
+
+                // seek to our start position
+                switch (seek(isWrite ? position : null))
+                {
+                    case END_OF_PARTITION:
+                        return false;
+                    case SUCCESS:
+                        return true;
+                }
+
+                if (!isWrite)
+                    throw new IllegalStateException();
+
+                // TODO: recompose our real position into the nearest scalar position, and ensure the seed position is >= this
+            }
+        }
+
+        private void decompose(int scalar, int[] decomposed)
+        {
+            for (int i = 0 ; i < decomposed.length ; i++)
+            {
+                int avg = generator.clusteringDescendantAverages[i];
+                decomposed[i] = scalar / avg;
+                scalar %= avg;
+            }
+            for (int i = limit.length - 1 ; i > 0 ; i--)
+            {
+                int avg = generator.clusteringComponentAverages[i];
+                if (decomposed[i] >= avg)
+                {
+                    decomposed[i - 1] += decomposed[i] / avg;
+                    decomposed[i] %= avg;
+                }
+            }
+        }
+
+        private int setNoLimit(int firstComponentCount)
+        {
+            Arrays.fill(limit, Integer.MAX_VALUE);
+            return firstComponentCount * generator.clusteringDescendantAverages[0];
+        }
+
+        private int setLimit(int position)
+        {
+            decompose(position, limit);
+            int expectedRowCount = 0;
+            for (int i = 0 ; i < limit.length ; i++)
+            {
+                int l = limit[i];
+                expectedRowCount += l * generator.clusteringDescendantAverages[i];
+            }
+            return expectedRowCount;
+        }
+
+        static enum State
+        {
+            END_OF_PARTITION, AFTER_LIMIT, SUCCESS;
+        }
+
+        // seek to the provided position (or the first entry if null)
+        private State seek(int scalar)
+        {
+            if (scalar == 0)
+            {
+                this.position[0] = -1;
+                clusteringComponents[0].addFirst(this);
+                return setHasNext(advance(0, true));
+            }
+
+            int[] position = this.position;
+            decompose(scalar, position);
+            boolean incremented = false;
+            for (int i = 0 ; i < position.length ; i++)
+            {
+                if (i != 0)
+                    fill(i);
+                for (int c = position[i] ; c > 0 ; c--)
+                    clusteringComponents[i].poll();
+
+                // we can have started from a position that does not exist, in which
+                // case we need to ascend back up our clustering components, advancing as we go
+                if (clusteringComponents[i].isEmpty())
+                {
+                    int j = i;
+                    while (--j >= 0)
+                    {
+                        clusteringComponents[j].poll();
+                        if (!clusteringComponents[j].isEmpty())
+                            break;
+                    }
+
+                    // if we've exhausted the whole partition, we're done
+                    if (j < 0)
+                        return setHasNext(false);
+
+                    // we don't check here to see if we've exceeded our limit,
+                    // because if we came to a non-existent position and generated a limit
+                    // we want to at least find the next real position, and set it on the seed
+                    // in this case we do then yield false and select a different seed to continue with
+                    position[j]++;
+                    Arrays.fill(position, j + 1, position.length, 0);
+                    while (j < i)
+                        fill(++j);
+                    incremented = true;
+                }
+                if (clusteringComponents[i].isEmpty())
+                    throw new IllegalStateException();
+                row.row[i] = clusteringComponents[i].peek();
+            }
+
+            if (incremented && compareToLastRow() > 0)
+                return setHasNext(false);
+
+            position[position.length - 1]--;
+            // call advance so we honour any select chance
+            clusteringComponents[position.length - 1].addFirst(this);
+
+            return setHasNext(advance(position.length - 1, true));
+        }
+
+        // normal method for moving the iterator forward; maintains the row object, and delegates to advance(int)
+        // to move the iterator to the next item
+        void advance()
+        {
+            // we are always at the leaf level when this method is invoked
+            // so we calculate the seed for generating the row by combining the seed that generated the clustering components
+            int depth = clusteringComponents.length - 1;
+            long parentSeed = clusteringSeeds[depth];
+            long rowSeed = seed(clusteringComponents[depth].peek(), generator.clusteringComponents.get(depth).type, parentSeed);
+
+            // and then fill the row with the _non-clustering_ values for the position we _were_ at, as this is what we'll deliver
+            for (int i = clusteringSeeds.length ; i < row.row.length ; i++)
+            {
+                Generator gen = generator.valueComponents.get(i - clusteringSeeds.length);
+                gen.setSeed(rowSeed);
+                row.row[i] = gen.generate();
+            }
+
+            // then we advance the leaf level
+            setHasNext(advance(depth, false));
+        }
+
+        private boolean advance(int depth, boolean first)
+        {
+            // advance the leaf component
+            clusteringComponents[depth].poll();
+            position[depth]++;
+            while (true)
+            {
+                if (clusteringComponents[depth].isEmpty())
+                {
+                    // if we've run out of clustering components at this level, ascend
+                    if (depth == 0)
+                        return false;
+                    depth--;
+                    clusteringComponents[depth].poll();
+                    if (++position[depth] > limit[depth])
+                        return false;
+                    continue;
+                }
+
+                int compareToLastRow = compareToLastRow();
+                if (compareToLastRow > 0 && !first)
+                    return false;
+                boolean forceReturnOne = first && compareToLastRow >= 0;
+
+                // the chance of descending is the uniform usechance, multiplied by the number of children
+                // we would on average generate (so if we have a 0.1 use chance, but should generate 10 children
+                // then we will always descend), multiplied by 1/(compound roll), where (compound roll) is the
+                // chance with which we reached this depth, i.e. if we already beat 50/50 odds, we double our
+                // chance of beating this next roll
+                double thischance = useChance * chancemodifier[depth];
+                if (forceReturnOne || thischance > 0.99999f || thischance >= random.nextDouble())
+                {
+                    // if we're descending, we fill in our clustering component and increase our depth
+                    row.row[depth] = clusteringComponents[depth].peek();
+                    depth++;
+                    if (depth == clusteringComponents.length)
+                        return true;
+                    // if we haven't reached the leaf, we update our probability statistics, fill in all of
+                    // this level's clustering components, and repeat
+                    if (useChance < 1d)
+                    {
+                        rollmodifier[depth] = rollmodifier[depth - 1] / Math.min(1d, thischance);
+                        chancemodifier[depth] = generator.clusteringDescendantAverages[depth] * rollmodifier[depth];
+                    }
+                    position[depth] = 0;
+                    fill(depth);
+                    continue;
+                }
+
+                if (compareToLastRow >= 0)
+                    return false;
+
+                // if we don't descend, we remove the clustering suffix we've skipped and continue
+                clusteringComponents[depth].poll();
+                position[depth]++;
+            }
+        }
+
+        private static int compare(int[] a, int[] b)
+        {
+            for (int i = 0 ; i != a.length ; i++)
+                if (a[i] != b[i])
+                    return Integer.compare(a[i], b[i]);
+            return 0;
+        }
+
+        private int compareToLastRow()
+        {
+            int c = position.length - 1;
+            for (int i = 0 ; i <= c ; i++)
+            {
+                int p = position[i], l = limit[i], r = clusteringComponents[i].size();
+                if (i == c && p == l - 1)
+                    return 0;
+                if ((p < l) & (r > 1))
+                    return -1;
+                if (p > l)
+                    return 1;
+            }
+            return 1;
+        }
+
+        // generate the clustering components for the provided depth; requires preceding components
+        // to have been generated and their seeds populated into clusteringSeeds
+        void fill(int depth)
+        {
+            long seed = clusteringSeeds[depth - 1];
+            Generator gen = generator.clusteringComponents.get(depth);
+            gen.setSeed(seed);
+            clusteringSeeds[depth] = seed(clusteringComponents[depth - 1].peek(), generator.clusteringComponents.get(depth - 1).type, seed);
+            fill(clusteringComponents[depth], (int) gen.clusteringDistribution.next(), gen);
+        }
+
+        // generate the clustering components into the queue
+        void fill(Queue<Object> queue, int count, Generator generator)
+        {
+            if (count == 1)
+            {
+                queue.add(generator.generate());
+                return;
+            }
+
+            switch (this.generator.order)
+            {
+                case SORTED:
+                    if (Comparable.class.isAssignableFrom(generator.clazz))
+                    {
+                        tosort.clear();
+                        for (int i = 0 ; i < count ; i++)
+                            tosort.add(generator.generate());
+                        Collections.sort((List<Comparable>) (List<?>) tosort);
+                        for (int i = 0 ; i < count ; i++)
+                            if (i == 0 || ((Comparable) tosort.get(i - 1)).compareTo(i) < 0)
+                                queue.add(tosort.get(i));
+                        break;
+                    }
+                case ARBITRARY:
+                    unique.clear();
+                    for (int i = 0 ; i < count ; i++)
+                    {
+                        Object next = generator.generate();
+                        if (unique.add(next))
+                            queue.add(next);
+                    }
+                    break;
+                case SHUFFLED:
+                    unique.clear();
+                    tosort.clear();
+                    ThreadLocalRandom rand = ThreadLocalRandom.current();
+                    for (int i = 0 ; i < count ; i++)
+                    {
+                        Object next = generator.generate();
+                        if (unique.add(next))
+                            tosort.add(next);
+                    }
+                    for (int i = 0 ; i < tosort.size() ; i++)
+                    {
+                        int index = rand.nextInt(i, tosort.size());
+                        Object obj = tosort.get(index);
+                        tosort.set(index, tosort.get(i));
+                        queue.add(obj);
+                    }
+                    break;
+                default:
+                    throw new IllegalStateException();
+            }
+        }
+
+        public boolean hasNext()
+        {
+            return hasNext;
+        }
+
+        public Row next()
+        {
+            if (!hasNext())
+                throw new NoSuchElementException();
+            advance();
+            return row;
+        }
+
+        public boolean finishedPartition()
+        {
+            return clusteringComponents[0].isEmpty();
+        }
+
+        private State setHasNext(boolean hasNext)
+        {
+            if (!hasNext)
+            {
+                this.hasNext = false;
+                boolean isLast = finishedPartition();
+                if (isWrite)
+                {
+                    boolean isFirst = isFirstWrite;
+                    if (isFirst)
+                        seedManager.markFirstWrite(seed, isLast);
+                    if (isLast)
+                        seedManager.markLastWrite(seed, isFirst);
+                }
+                return isLast ? State.END_OF_PARTITION : State.AFTER_LIMIT;
+            }
+            this.hasNext = hasNext;
+            return State.SUCCESS;
+        }
+    }
+
+    public void remove()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    // calculate a new seed based on the combination of a parent seed and the generated child, to generate
+    // any children of this child
+    static long seed(Object object, AbstractType type, long seed)
+    {
+        if (object instanceof ByteBuffer)
+        {
+            ByteBuffer buf = (ByteBuffer) object;
+            for (int i = buf.position() ; i < buf.limit() ; i++)
+                seed = (31 * seed) + buf.get(i);
+            return seed;
+        }
+        else if (object instanceof String)
+        {
+            String str = (String) object;
+            for (int i = 0 ; i < str.length() ; i++)
+                seed = (31 * seed) + str.charAt(i);
+            return seed;
+        }
+        else if (object instanceof Number)
+        {
+            return (seed * 31) + ((Number) object).longValue();
+        }
+        else if (object instanceof UUID)
+        {
+            return seed * 31 + (((UUID) object).getLeastSignificantBits() ^ ((UUID) object).getMostSignificantBits());
+        }
+        else
+        {
+            return seed(type.decompose(object), BytesType.instance, seed);
+        }
+    }
+
+    public Object getPartitionKey(int i)
+    {
+        return partitionKey[i];
+    }
+
+    public String getKeyAsString()
+    {
+        StringBuilder sb = new StringBuilder();
+        int i = 0;
+        for (Object key : partitionKey)
+        {
+            if (i > 0)
+                sb.append("|");
+            AbstractType type = generator.partitionKey.get(i++).type;
+            sb.append(type.getString(type.decompose(key)));
+        }
+        return sb.toString();
+    }
+
+    // used for thrift smart routing - if it's a multi-part key we don't try to route correctly right now
+    public ByteBuffer getToken()
+    {
+        return generator.partitionKey.get(0).type.decompose(partitionKey[0]);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/597a1d5d/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
index 9e1a5e8..9713e93 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
@@ -32,6 +32,7 @@ import org.apache.commons.math3.util.Pair;
 import org.apache.cassandra.stress.Operation;
 import org.apache.cassandra.stress.generate.DistributionFactory;
 import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.SeedManager;
 import org.apache.cassandra.stress.util.Timer;
 
 public abstract class SampledOpDistributionFactory<T> implements OpDistributionFactory


[2/3] cassandra git commit: cassandra-stress simultaneous inserts over same seed (take two)

Posted by be...@apache.org.
cassandra-stress simultaneous inserts over same seed
(take two)

patch by benedict; reviewed by rstupp CASSANDRA-7964


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/597a1d5d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/597a1d5d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/597a1d5d

Branch: refs/heads/trunk
Commit: 597a1d5db27ef9e37f7066868e76cc9450fc3c9c
Parents: 51f7cad
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Dec 12 15:07:15 2014 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Dec 12 15:07:15 2014 +0000

----------------------------------------------------------------------
 .../apache/cassandra/stress/WorkManager.java    |  65 ++
 .../stress/generate/PartitionIterator.java      | 632 +++++++++++++++++++
 .../SampledOpDistributionFactory.java           |   1 +
 3 files changed, 698 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/597a1d5d/tools/stress/src/org/apache/cassandra/stress/WorkManager.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/WorkManager.java b/tools/stress/src/org/apache/cassandra/stress/WorkManager.java
new file mode 100644
index 0000000..c6a3eee
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/WorkManager.java
@@ -0,0 +1,65 @@
+package org.apache.cassandra.stress;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+interface WorkManager
+{
+    // -1 indicates consumer should terminate
+    int takePermits(int count);
+
+    // signal all consumers to terminate
+    void stop();
+
+    static final class FixedWorkManager implements WorkManager
+    {
+
+        final AtomicLong permits;
+
+        public FixedWorkManager(long permits)
+        {
+            this.permits = new AtomicLong(permits);
+        }
+
+        @Override
+        public int takePermits(int count)
+        {
+            while (true)
+            {
+                long cur = permits.get();
+                if (cur == 0)
+                    return -1;
+                count = (int) Math.min(count, cur);
+                long next = cur - count;
+                if (permits.compareAndSet(cur, next))
+                    return count;
+            }
+        }
+
+        @Override
+        public void stop()
+        {
+            permits.getAndSet(0);
+        }
+    }
+
+    static final class ContinuousWorkManager implements WorkManager
+    {
+
+        volatile boolean stop = false;
+
+        @Override
+        public int takePermits(int count)
+        {
+            if (stop)
+                return -1;
+            return count;
+        }
+
+        @Override
+        public void stop()
+        {
+            stop = true;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/597a1d5d/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
new file mode 100644
index 0000000..baab867
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
@@ -0,0 +1,632 @@
+package org.apache.cassandra.stress.generate;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.generate.values.Generator;
+
+// a partition is re-used to reduce garbage generation, as is its internal RowIterator
+// TODO: we should batch the generation of clustering components so we can bound the time and size necessary to
+// generate huge partitions with only a small number of clustering components; i.e. we should generate seeds for batches
+// of a single component, and then generate the values within those batches as necessary. this will be difficult with
+// generating sorted partitions, and may require generator support (e.g. we may need to support generating prefixes
+// that are extended/suffixed to generate each batch, so that we can sort the prefixes)
+public abstract class PartitionIterator implements Iterator<Row>
+{
+
+    // we reuse the row object to save garbage
+    abstract boolean reset(double useChance, int targetCount, Operation op);
+
+    long idseed;
+    Seed seed;
+    final Object[] partitionKey;
+    final PartitionGenerator generator;
+    final SeedManager seedManager;
+    final Row row;
+
+    public static PartitionIterator get(PartitionGenerator generator, SeedManager seedManager)
+    {
+        if (generator.clusteringComponents.size() > 0)
+            return new MultiRowIterator(generator, seedManager);
+        else
+            return new SingleRowIterator(generator, seedManager);
+    }
+
+    private PartitionIterator(PartitionGenerator generator, SeedManager seedManager)
+    {
+        this.generator = generator;
+        this.seedManager = seedManager;
+        this.partitionKey = new Object[generator.partitionKey.size()];
+        this.row = new Row(partitionKey, new Object[generator.clusteringComponents.size() + generator.valueComponents.size()]);
+    }
+
+    private void setSeed(Seed seed)
+    {
+        long idseed = 0;
+        for (int i = 0 ; i < partitionKey.length ; i++)
+        {
+            Generator generator = this.generator.partitionKey.get(i);
+            // set the partition key seed based on the current work item we're processing
+            generator.setSeed(seed.seed);
+            Object key = generator.generate();
+            partitionKey[i] = key;
+            // then contribute this value to the data seed
+            idseed = seed(key, generator.type, idseed);
+        }
+        this.seed = seed;
+        this.idseed = idseed;
+    }
+
+    public boolean reset(Seed seed, double useChance, Operation op)
+    {
+        setSeed(seed);
+        return reset(useChance, 0, op);
+    }
+
+    public boolean reset(Seed seed, int targetCount, Operation op)
+    {
+        setSeed(seed);
+        return reset(Double.NaN, targetCount, op);
+    }
+
+    static class SingleRowIterator extends PartitionIterator
+    {
+        boolean done;
+        boolean isWrite;
+
+        private SingleRowIterator(PartitionGenerator generator, SeedManager seedManager)
+        {
+            super(generator, seedManager);
+        }
+
+        boolean reset(double useChance, int targetCount, Operation op)
+        {
+            done = false;
+            isWrite = op.isWrite();
+            return true;
+        }
+
+        public boolean hasNext()
+        {
+            return !done;
+        }
+
+        public Row next()
+        {
+            if (done)
+                throw new NoSuchElementException();
+            for (int i = 0 ; i < row.row.length ; i++)
+            {
+                Generator gen = generator.valueComponents.get(i);
+                gen.setSeed(idseed);
+                row.row[i] = gen.generate();
+            }
+            done = true;
+            if (isWrite)
+            {
+                seedManager.markFirstWrite(seed, true);
+                seedManager.markLastWrite(seed, true);
+            }
+            return row;
+        }
+    }
+
+    // permits iterating a random subset of the procedurally generated rows in this partition. this is the only mechanism for visiting rows.
+    // we maintain a stack of clustering components and their seeds; for each clustering component we visit, we generate all values it takes at that level,
+    // and then, using the average (total) number of children it takes we randomly choose whether or not we visit its children;
+    // if we do, we generate all possible values the immediate children can take, and repeat the process. So at any one time we are using space proportional
+    // to C.N, where N is the average number of values each clustering component takes, as opposed to N^C total values in the partition.
+    // TODO : support first/last row, and constraining reads to rows we know are populated
+    static class MultiRowIterator extends PartitionIterator
+    {
+
+        // probability any single row will be generated in this iteration
+        double useChance;
+
+        // the seed used to generate the current values for the clustering components at each depth;
+        // used to save recalculating it for each row, so we only need to recalc from prior row.
+        final long[] clusteringSeeds = new long[generator.clusteringComponents.size()];
+        // the components remaining to be visited for each level of the current stack
+        final Deque<Object>[] clusteringComponents = new ArrayDeque[generator.clusteringComponents.size()];
+
+        // we want our chance of selection to be applied uniformly, so we compound the roll we make at each level
+        // so that we know with what chance we reached there, and we adjust our roll at that level by that amount
+        final double[] chancemodifier = new double[generator.clusteringComponents.size()];
+        final double[] rollmodifier = new double[generator.clusteringComponents.size()];
+        final ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        // track where in the partition we are, and where we are limited to
+        final int[] position = new int[generator.clusteringComponents.size()];
+        final int[] limit = new int[position.length];
+        boolean hasNext, isFirstWrite, isWrite;
+
+        // reusable collections for generating unique and sorted clustering components
+        final Set<Object> unique = new HashSet<>();
+        final List<Object> tosort = new ArrayList<>();
+
+        MultiRowIterator(PartitionGenerator generator, SeedManager seedManager)
+        {
+            super(generator, seedManager);
+            for (int i = 0 ; i < clusteringComponents.length ; i++)
+                clusteringComponents[i] = new ArrayDeque<>();
+            rollmodifier[0] = 1f;
+            chancemodifier[0] = generator.clusteringDescendantAverages[0];
+        }
+
+        // if we're a write, the expected behaviour is that the requested batch count is compounded with the seed's visit
+        // count to decide how much we should return in one iteration
+        boolean reset(double useChance, int targetCount, Operation op)
+        {
+            if (this.useChance < 1d)
+            {
+                // we clear our prior roll-modifiers if the use chance was previously less-than zero
+                Arrays.fill(rollmodifier, 1d);
+                Arrays.fill(chancemodifier, 1d);
+            }
+
+            // set the seed for the first clustering component
+            generator.clusteringComponents.get(0).setSeed(idseed);
+
+            // calculate how many first clustering components we'll generate, and how many total rows this predicts
+            int firstComponentCount = (int) generator.clusteringComponents.get(0).clusteringDistribution.next();
+            int expectedRowCount;
+
+            int position = seed.position();
+            isWrite = op.isWrite();
+
+            if (isWrite)
+                expectedRowCount = firstComponentCount * generator.clusteringDescendantAverages[0];
+            else if (position != 0)
+                expectedRowCount = setLimit(position);
+            else
+                expectedRowCount = setNoLimit(firstComponentCount);
+
+            if (Double.isNaN(useChance))
+                useChance = Math.max(0d, Math.min(1d, targetCount / (double) expectedRowCount));
+            this.useChance = useChance;
+
+            while (true)
+            {
+                // TODO: we could avoid repopulating these each loop, by tracking our prior position
+                for (Queue<?> q : clusteringComponents)
+                    q.clear();
+                clusteringSeeds[0] = idseed;
+                fill(clusteringComponents[0], firstComponentCount, generator.clusteringComponents.get(0));
+
+                // we loop in case we have picked an entirely non-existent range, in which case
+                // we will reset the seed's position, then try again (until we exhaust it or find
+                // some real range) - this only happens for writes, so we only keep this logic in the loop
+
+                if (isWrite)
+                {
+                    position = seed.moveForwards(Math.max(1, expectedRowCount / seed.visits));
+                    isFirstWrite = position == 0;
+                }
+
+                // seek to our start position
+                switch (seek(isWrite ? position : null))
+                {
+                    case END_OF_PARTITION:
+                        return false;
+                    case SUCCESS:
+                        return true;
+                }
+
+                if (!isWrite)
+                    throw new IllegalStateException();
+
+                // TODO: recompose our real position into the nearest scalar position, and ensure the seed position is >= this
+            }
+        }
+
+        private void decompose(int scalar, int[] decomposed)
+        {
+            for (int i = 0 ; i < decomposed.length ; i++)
+            {
+                int avg = generator.clusteringDescendantAverages[i];
+                decomposed[i] = scalar / avg;
+                scalar %= avg;
+            }
+            for (int i = limit.length - 1 ; i > 0 ; i--)
+            {
+                int avg = generator.clusteringComponentAverages[i];
+                if (decomposed[i] >= avg)
+                {
+                    decomposed[i - 1] += decomposed[i] / avg;
+                    decomposed[i] %= avg;
+                }
+            }
+        }
+
+        private int setNoLimit(int firstComponentCount)
+        {
+            Arrays.fill(limit, Integer.MAX_VALUE);
+            return firstComponentCount * generator.clusteringDescendantAverages[0];
+        }
+
+        private int setLimit(int position)
+        {
+            decompose(position, limit);
+            int expectedRowCount = 0;
+            for (int i = 0 ; i < limit.length ; i++)
+            {
+                int l = limit[i];
+                expectedRowCount += l * generator.clusteringDescendantAverages[i];
+            }
+            return expectedRowCount;
+        }
+
+        static enum State
+        {
+            END_OF_PARTITION, AFTER_LIMIT, SUCCESS;
+        }
+
+        // seek to the provided position (or the first entry if null)
+        private State seek(int scalar)
+        {
+            if (scalar == 0)
+            {
+                this.position[0] = -1;
+                clusteringComponents[0].addFirst(this);
+                return setHasNext(advance(0, true));
+            }
+
+            int[] position = this.position;
+            decompose(scalar, position);
+            boolean incremented = false;
+            for (int i = 0 ; i < position.length ; i++)
+            {
+                if (i != 0)
+                    fill(i);
+                for (int c = position[i] ; c > 0 ; c--)
+                    clusteringComponents[i].poll();
+
+                // we can have started from a position that does not exist, in which
+                // case we need to ascend back up our clustering components, advancing as we go
+                if (clusteringComponents[i].isEmpty())
+                {
+                    int j = i;
+                    while (--j >= 0)
+                    {
+                        clusteringComponents[j].poll();
+                        if (!clusteringComponents[j].isEmpty())
+                            break;
+                    }
+
+                    // if we've exhausted the whole partition, we're done
+                    if (j < 0)
+                        return setHasNext(false);
+
+                    // we don't check here to see if we've exceeded our limit,
+                    // because if we came to a non-existent position and generated a limit
+                    // we want to at least find the next real position, and set it on the seed
+                    // in this case we do then yield false and select a different seed to continue with
+                    position[j]++;
+                    Arrays.fill(position, j + 1, position.length, 0);
+                    while (j < i)
+                        fill(++j);
+                    incremented = true;
+                }
+                if (clusteringComponents[i].isEmpty())
+                    throw new IllegalStateException();
+                row.row[i] = clusteringComponents[i].peek();
+            }
+
+            if (incremented && compareToLastRow() > 0)
+                return setHasNext(false);
+
+            position[position.length - 1]--;
+            // call advance so we honour any select chance
+            clusteringComponents[position.length - 1].addFirst(this);
+
+            return setHasNext(advance(position.length - 1, true));
+        }
+
+        // normal method for moving the iterator forward; maintains the row object, and delegates to advance(int)
+        // to move the iterator to the next item
+        void advance()
+        {
+            // we are always at the leaf level when this method is invoked
+            // so we calculate the seed for generating the row by combining the seed that generated the clustering components
+            int depth = clusteringComponents.length - 1;
+            long parentSeed = clusteringSeeds[depth];
+            long rowSeed = seed(clusteringComponents[depth].peek(), generator.clusteringComponents.get(depth).type, parentSeed);
+
+            // and then fill the row with the _non-clustering_ values for the position we _were_ at, as this is what we'll deliver
+            for (int i = clusteringSeeds.length ; i < row.row.length ; i++)
+            {
+                Generator gen = generator.valueComponents.get(i - clusteringSeeds.length);
+                gen.setSeed(rowSeed);
+                row.row[i] = gen.generate();
+            }
+
+            // then we advance the leaf level
+            setHasNext(advance(depth, false));
+        }
+
+        private boolean advance(int depth, boolean first)
+        {
+            // advance the leaf component
+            clusteringComponents[depth].poll();
+            position[depth]++;
+            while (true)
+            {
+                if (clusteringComponents[depth].isEmpty())
+                {
+                    // if we've run out of clustering components at this level, ascend
+                    if (depth == 0)
+                        return false;
+                    depth--;
+                    clusteringComponents[depth].poll();
+                    if (++position[depth] > limit[depth])
+                        return false;
+                    continue;
+                }
+
+                int compareToLastRow = compareToLastRow();
+                if (compareToLastRow > 0 && !first)
+                    return false;
+                boolean forceReturnOne = first && compareToLastRow >= 0;
+
+                // the chance of descending is the uniform usechance, multiplied by the number of children
+                // we would on average generate (so if we have a 0.1 use chance, but should generate 10 children
+                // then we will always descend), multiplied by 1/(compound roll), where (compound roll) is the
+                // chance with which we reached this depth, i.e. if we already beat 50/50 odds, we double our
+                // chance of beating this next roll
+                double thischance = useChance * chancemodifier[depth];
+                if (forceReturnOne || thischance > 0.99999f || thischance >= random.nextDouble())
+                {
+                    // if we're descending, we fill in our clustering component and increase our depth
+                    row.row[depth] = clusteringComponents[depth].peek();
+                    depth++;
+                    if (depth == clusteringComponents.length)
+                        return true;
+                    // if we haven't reached the leaf, we update our probability statistics, fill in all of
+                    // this level's clustering components, and repeat
+                    if (useChance < 1d)
+                    {
+                        rollmodifier[depth] = rollmodifier[depth - 1] / Math.min(1d, thischance);
+                        chancemodifier[depth] = generator.clusteringDescendantAverages[depth] * rollmodifier[depth];
+                    }
+                    position[depth] = 0;
+                    fill(depth);
+                    continue;
+                }
+
+                if (compareToLastRow >= 0)
+                    return false;
+
+                // if we don't descend, we remove the clustering suffix we've skipped and continue
+                clusteringComponents[depth].poll();
+                position[depth]++;
+            }
+        }
+
+        private static int compare(int[] a, int[] b)
+        {
+            for (int i = 0 ; i != a.length ; i++)
+                if (a[i] != b[i])
+                    return Integer.compare(a[i], b[i]);
+            return 0;
+        }
+
+        private int compareToLastRow()
+        {
+            int c = position.length - 1;
+            for (int i = 0 ; i <= c ; i++)
+            {
+                int p = position[i], l = limit[i], r = clusteringComponents[i].size();
+                if (i == c && p == l - 1)
+                    return 0;
+                if ((p < l) & (r > 1))
+                    return -1;
+                if (p > l)
+                    return 1;
+            }
+            return 1;
+        }
+
+        // generate the clustering components for the provided depth; requires preceding components
+        // to have been generated and their seeds populated into clusteringSeeds
+        void fill(int depth)
+        {
+            long seed = clusteringSeeds[depth - 1];
+            Generator gen = generator.clusteringComponents.get(depth);
+            gen.setSeed(seed);
+            clusteringSeeds[depth] = seed(clusteringComponents[depth - 1].peek(), generator.clusteringComponents.get(depth - 1).type, seed);
+            fill(clusteringComponents[depth], (int) gen.clusteringDistribution.next(), gen);
+        }
+
+        // generate the clustering components into the queue
+        void fill(Queue<Object> queue, int count, Generator generator)
+        {
+            if (count == 1)
+            {
+                queue.add(generator.generate());
+                return;
+            }
+
+            switch (this.generator.order)
+            {
+                case SORTED:
+                    if (Comparable.class.isAssignableFrom(generator.clazz))
+                    {
+                        tosort.clear();
+                        for (int i = 0 ; i < count ; i++)
+                            tosort.add(generator.generate());
+                        Collections.sort((List<Comparable>) (List<?>) tosort);
+                        for (int i = 0 ; i < count ; i++)
+                            if (i == 0 || ((Comparable) tosort.get(i - 1)).compareTo(i) < 0)
+                                queue.add(tosort.get(i));
+                        break;
+                    }
+                case ARBITRARY:
+                    unique.clear();
+                    for (int i = 0 ; i < count ; i++)
+                    {
+                        Object next = generator.generate();
+                        if (unique.add(next))
+                            queue.add(next);
+                    }
+                    break;
+                case SHUFFLED:
+                    unique.clear();
+                    tosort.clear();
+                    ThreadLocalRandom rand = ThreadLocalRandom.current();
+                    for (int i = 0 ; i < count ; i++)
+                    {
+                        Object next = generator.generate();
+                        if (unique.add(next))
+                            tosort.add(next);
+                    }
+                    for (int i = 0 ; i < tosort.size() ; i++)
+                    {
+                        int index = rand.nextInt(i, tosort.size());
+                        Object obj = tosort.get(index);
+                        tosort.set(index, tosort.get(i));
+                        queue.add(obj);
+                    }
+                    break;
+                default:
+                    throw new IllegalStateException();
+            }
+        }
+
+        public boolean hasNext()
+        {
+            return hasNext;
+        }
+
+        public Row next()
+        {
+            if (!hasNext())
+                throw new NoSuchElementException();
+            advance();
+            return row;
+        }
+
+        public boolean finishedPartition()
+        {
+            return clusteringComponents[0].isEmpty();
+        }
+
+        private State setHasNext(boolean hasNext)
+        {
+            if (!hasNext)
+            {
+                this.hasNext = false;
+                boolean isLast = finishedPartition();
+                if (isWrite)
+                {
+                    boolean isFirst = isFirstWrite;
+                    if (isFirst)
+                        seedManager.markFirstWrite(seed, isLast);
+                    if (isLast)
+                        seedManager.markLastWrite(seed, isFirst);
+                }
+                return isLast ? State.END_OF_PARTITION : State.AFTER_LIMIT;
+            }
+            this.hasNext = hasNext;
+            return State.SUCCESS;
+        }
+    }
+
+    public void remove()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    // calculate a new seed based on the combination of a parent seed and the generated child, to generate
+    // any children of this child
+    static long seed(Object object, AbstractType type, long seed)
+    {
+        if (object instanceof ByteBuffer)
+        {
+            ByteBuffer buf = (ByteBuffer) object;
+            for (int i = buf.position() ; i < buf.limit() ; i++)
+                seed = (31 * seed) + buf.get(i);
+            return seed;
+        }
+        else if (object instanceof String)
+        {
+            String str = (String) object;
+            for (int i = 0 ; i < str.length() ; i++)
+                seed = (31 * seed) + str.charAt(i);
+            return seed;
+        }
+        else if (object instanceof Number)
+        {
+            return (seed * 31) + ((Number) object).longValue();
+        }
+        else if (object instanceof UUID)
+        {
+            return seed * 31 + (((UUID) object).getLeastSignificantBits() ^ ((UUID) object).getMostSignificantBits());
+        }
+        else
+        {
+            return seed(type.decompose(object), BytesType.instance, seed);
+        }
+    }
+
+    public Object getPartitionKey(int i)
+    {
+        return partitionKey[i];
+    }
+
+    public String getKeyAsString()
+    {
+        StringBuilder sb = new StringBuilder();
+        int i = 0;
+        for (Object key : partitionKey)
+        {
+            if (i > 0)
+                sb.append("|");
+            AbstractType type = generator.partitionKey.get(i++).type;
+            sb.append(type.getString(type.decompose(key)));
+        }
+        return sb.toString();
+    }
+
+    // used for thrift smart routing - if it's a multi-part key we don't try to route correctly right now
+    public ByteBuffer getToken()
+    {
+        return generator.partitionKey.get(0).type.decompose(partitionKey[0]);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/597a1d5d/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
index 9e1a5e8..9713e93 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
@@ -32,6 +32,7 @@ import org.apache.commons.math3.util.Pair;
 import org.apache.cassandra.stress.Operation;
 import org.apache.cassandra.stress.generate.DistributionFactory;
 import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.SeedManager;
 import org.apache.cassandra.stress.util.Timer;
 
 public abstract class SampledOpDistributionFactory<T> implements OpDistributionFactory


[3/3] cassandra git commit: Merge branch 'cassandra-2.1' into trunk

Posted by be...@apache.org.
Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5876d934
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5876d934
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5876d934

Branch: refs/heads/trunk
Commit: 5876d93424ff49bba9db2de19442992c4dfe641d
Parents: b5795ef 597a1d5
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Dec 12 15:07:34 2014 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Dec 12 15:07:34 2014 +0000

----------------------------------------------------------------------
 .../apache/cassandra/stress/WorkManager.java    |  65 ++
 .../stress/generate/PartitionIterator.java      | 632 +++++++++++++++++++
 .../SampledOpDistributionFactory.java           |   1 +
 3 files changed, 698 insertions(+)
----------------------------------------------------------------------