You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/01/21 17:52:32 UTC

[1/2] cassandra git commit: Fix cassandra-stress user-mode truncation of partition generation

Repository: cassandra
Updated Branches:
  refs/heads/trunk 37f127897 -> 184bb65fc


Fix cassandra-stress user-mode truncation of partition generation

patch by benedict; reviewed by tjake for CASSANDRA-8608


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1435b9a8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1435b9a8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1435b9a8

Branch: refs/heads/trunk
Commit: 1435b9a87a4e1878cb35bd6d75e631bf2093e460
Parents: 576a75f
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Jan 21 16:50:55 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Jan 21 16:50:55 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/stress/Operation.java  |   4 +-
 .../stress/generate/PartitionIterator.java      | 219 ++++++++++---------
 3 files changed, 122 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1435b9a8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8f71269..0c2bab8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.3
+ * Fix cassandra-stress user-mode truncation of partition generation (CASSANDRA-8608)
  * Only stream from unrepaired sstables during inc repair (CASSANDRA-8267)
  * Don't allow starting multiple inc repairs on the same sstables (CASSANDRA-8316)
  * Invalidate prepared BATCH statements when related tables

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1435b9a8/tools/stress/src/org/apache/cassandra/stress/Operation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Operation.java b/tools/stress/src/org/apache/cassandra/stress/Operation.java
index edf3a54..05045f8 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Operation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Operation.java
@@ -105,9 +105,9 @@ public abstract class Operation
                     break;
 
                 if (spec.useRatio == null)
-                    success = partitionCache.get(i).reset(seed, spec.targetCount, this);
+                    success = partitionCache.get(i).reset(seed, spec.targetCount, isWrite());
                 else
-                    success = partitionCache.get(i).reset(seed, spec.useRatio.next(), this);
+                    success = partitionCache.get(i).reset(seed, spec.useRatio.next(), isWrite());
             }
         }
         partitionCount = i;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1435b9a8/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
index 0d0cba1..0466edb 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
@@ -50,14 +50,16 @@ import org.apache.cassandra.stress.generate.values.Generator;
 public abstract class PartitionIterator implements Iterator<Row>
 {
 
-    // we reuse the row object to save garbage
-    abstract boolean reset(double useChance, int targetCount, Operation op);
+    abstract boolean reset(double useChance, int targetCount, boolean isWrite);
 
     long idseed;
     Seed seed;
-    final Object[] partitionKey;
+
     final PartitionGenerator generator;
     final SeedManager seedManager;
+
+    // we reuse these objects to save garbage
+    final Object[] partitionKey;
     final Row row;
 
     public static PartitionIterator get(PartitionGenerator generator, SeedManager seedManager)
@@ -93,16 +95,16 @@ public abstract class PartitionIterator implements Iterator<Row>
         this.idseed = idseed;
     }
 
-    public boolean reset(Seed seed, double useChance, Operation op)
+    public boolean reset(Seed seed, double useChance, boolean isWrite)
     {
         setSeed(seed);
-        return reset(useChance, 0, op);
+        return reset(useChance, 0, isWrite);
     }
 
-    public boolean reset(Seed seed, int targetCount, Operation op)
+    public boolean reset(Seed seed, int targetCount, boolean isWrite)
     {
         setSeed(seed);
-        return reset(Double.NaN, targetCount, op);
+        return reset(Double.NaN, targetCount, isWrite);
     }
 
     static class SingleRowIterator extends PartitionIterator
@@ -115,10 +117,10 @@ public abstract class PartitionIterator implements Iterator<Row>
             super(generator, seedManager);
         }
 
-        boolean reset(double useChance, int targetCount, Operation op)
+        boolean reset(double useChance, int targetCount, boolean isWrite)
         {
             done = false;
-            isWrite = op.isWrite();
+            this.isWrite = isWrite;
             return true;
         }
 
@@ -155,24 +157,22 @@ public abstract class PartitionIterator implements Iterator<Row>
     // TODO : support first/last row, and constraining reads to rows we know are populated
     static class MultiRowIterator extends PartitionIterator
     {
-
-        // probability any single row will be generated in this iteration
-        double useChance;
-
         // the seed used to generate the current values for the clustering components at each depth;
         // used to save recalculating it for each row, so we only need to recalc from prior row.
         final long[] clusteringSeeds = new long[generator.clusteringComponents.size()];
         // the components remaining to be visited for each level of the current stack
         final Deque<Object>[] clusteringComponents = new ArrayDeque[generator.clusteringComponents.size()];
 
+        // probability any single row will be generated in this iteration
+        double useChance;
         // we want our chance of selection to be applied uniformly, so we compound the roll we make at each level
         // so that we know with what chance we reached there, and we adjust our roll at that level by that amount
         final double[] chancemodifier = new double[generator.clusteringComponents.size()];
         final double[] rollmodifier = new double[generator.clusteringComponents.size()];
 
         // track where in the partition we are, and where we are limited to
-        final int[] position = new int[generator.clusteringComponents.size()];
-        final int[] limit = new int[position.length];
+        final int[] currentRow = new int[generator.clusteringComponents.size()];
+        final int[] lastRow = new int[currentRow.length];
         boolean hasNext, isFirstWrite, isWrite;
 
         // reusable collections for generating unique and sorted clustering components
@@ -188,10 +188,22 @@ public abstract class PartitionIterator implements Iterator<Row>
             chancemodifier[0] = generator.clusteringDescendantAverages[0];
         }
 
-        // if we're a write, the expected behaviour is that the requested batch count is compounded with the seed's visit
-        // count to decide how much we should return in one iteration
-        boolean reset(double useChance, int targetCount, Operation op)
+        /**
+         * initialise the iterator state
+         *
+         * if we're a write, the expected behaviour is that the requested
+         * batch count is compounded with the seed's visit count to decide
+         * how much we should return in one iteration
+         *
+         * @param useChance uniform chance of visiting any single row (NaN if targetCount provided)
+         * @param targetCount number of rows we would like to visit (0 if useChance provided)
+         * @param isWrite true if the action requires write semantics
+         *
+         * @return true if there is data to return, false otherwise
+         */
+        boolean reset(double useChance, int targetCount, boolean isWrite)
         {
+            this.isWrite = isWrite;
             if (this.useChance < 1d)
             {
                 // we clear our prior roll-modifiers if the use chance was previously less-than zero
@@ -207,14 +219,13 @@ public abstract class PartitionIterator implements Iterator<Row>
             int expectedRowCount;
 
             int position = seed.position();
-            isWrite = op.isWrite();
 
             if (isWrite)
                 expectedRowCount = firstComponentCount * generator.clusteringDescendantAverages[0];
             else if (position != 0)
-                expectedRowCount = setLimit(position);
+                expectedRowCount = setLastRow(position - 1);
             else
-                expectedRowCount = setNoLimit(firstComponentCount);
+                expectedRowCount = setNoLastRow(firstComponentCount);
 
             if (Double.isNaN(useChance))
                 useChance = Math.max(0d, Math.min(1d, targetCount / (double) expectedRowCount));
@@ -222,38 +233,84 @@ public abstract class PartitionIterator implements Iterator<Row>
 
             while (true)
             {
-                // TODO: we could avoid repopulating these each loop, by tracking our prior position
+                // we loop in case we have picked an entirely non-existent range, in which case
+                // we will reset the seed's position, then try again (until we exhaust it or find
+                // some real range)
+
                 for (Queue<?> q : clusteringComponents)
                     q.clear();
                 clusteringSeeds[0] = idseed;
                 fill(clusteringComponents[0], firstComponentCount, generator.clusteringComponents.get(0));
 
-                // we loop in case we have picked an entirely non-existent range, in which case
-                // we will reset the seed's position, then try again (until we exhaust it or find
-                // some real range) - this only happens for writes, so we only keep this logic in the loop
-
-                if (isWrite)
+                if (!isWrite)
                 {
-                    position = seed.moveForwards(Math.max(1, expectedRowCount / seed.visits));
-                    isFirstWrite = position == 0;
+                    if (seek(0) != State.SUCCESS)
+                        throw new IllegalStateException();
+                    return true;
                 }
 
+
+                int count = Math.max(1, expectedRowCount / seed.visits);
+                position = seed.moveForwards(count);
+                isFirstWrite = position == 0;
+                setLastRow(position + count - 1);
+
                 // seek to our start position
-                switch (seek(isWrite ? position : 0))
+                switch (seek(position))
                 {
                     case END_OF_PARTITION:
                         return false;
                     case SUCCESS:
                         return true;
                 }
+            }
+        }
 
-                if (!isWrite)
-                    throw new IllegalStateException();
+        // returns expected row count
+        private int setNoLastRow(int firstComponentCount)
+        {
+            Arrays.fill(lastRow, Integer.MAX_VALUE);
+            return firstComponentCount * generator.clusteringDescendantAverages[0];
+        }
 
-                // TODO: recompose our real position into the nearest scalar position, and ensure the seed position is >= this
+        // sets the last row we will visit
+        // returns expected distance from zero
+        private int setLastRow(int position)
+        {
+            if (position < 0)
+                throw new IllegalStateException();
+
+            decompose(position, lastRow);
+            int expectedRowCount = 0;
+            for (int i = 0 ; i < lastRow.length ; i++)
+            {
+                int l = lastRow[i];
+                expectedRowCount += l * generator.clusteringDescendantAverages[i];
             }
+            return expectedRowCount + 1;
         }
 
+        // returns 0 if we are currently on the last row we are allocated to visit; 1 if it is after, -1 if it is before
+        // this is defined by _limit_, which is wired up from expected (mean) row counts
+        // the last row is where position == lastRow, except the last index is 1 less;
+        // OR if that row does not exist, it is the last row prior to it
+        private int compareToLastRow(int depth)
+        {
+            for (int i = 0 ; i <= depth ; i++)
+            {
+                int p = currentRow[i], l = lastRow[i], r = clusteringComponents[i].size();
+                if ((p == l) | (r == 1))
+                    continue;
+                return p - l;
+            }
+            return 0;
+        }
+
+        /**
+         * Translate the scalar position into a tiered position based on mean expected counts
+         * @param scalar scalar position
+         * @param decomposed target container
+         */
         private void decompose(int scalar, int[] decomposed)
         {
             for (int i = 0 ; i < decomposed.length ; i++)
@@ -262,7 +319,7 @@ public abstract class PartitionIterator implements Iterator<Row>
                 decomposed[i] = scalar / avg;
                 scalar %= avg;
             }
-            for (int i = limit.length - 1 ; i > 0 ; i--)
+            for (int i = lastRow.length - 1 ; i > 0 ; i--)
             {
                 int avg = generator.clusteringComponentAverages[i];
                 if (decomposed[i] >= avg)
@@ -273,42 +330,28 @@ public abstract class PartitionIterator implements Iterator<Row>
             }
         }
 
-        private int setNoLimit(int firstComponentCount)
-        {
-            Arrays.fill(limit, Integer.MAX_VALUE);
-            return firstComponentCount * generator.clusteringDescendantAverages[0];
-        }
-
-        private int setLimit(int position)
-        {
-            decompose(position, limit);
-            int expectedRowCount = 0;
-            for (int i = 0 ; i < limit.length ; i++)
-            {
-                int l = limit[i];
-                expectedRowCount += l * generator.clusteringDescendantAverages[i];
-            }
-            return expectedRowCount;
-        }
-
         static enum State
         {
             END_OF_PARTITION, AFTER_LIMIT, SUCCESS;
         }
 
-        // seek to the provided position (or the first entry if null)
+        /**
+         * seek to the provided position to initialise the iterator
+         *
+         * @param scalar scalar position
+         * @return resultant iterator state
+         */
         private State seek(int scalar)
         {
             if (scalar == 0)
             {
-                this.position[0] = -1;
+                this.currentRow[0] = -1;
                 clusteringComponents[0].addFirst(this);
                 return setHasNext(advance(0, true));
             }
 
-            int[] position = this.position;
+            int[] position = this.currentRow;
             decompose(scalar, position);
-            boolean incremented = false;
             for (int i = 0 ; i < position.length ; i++)
             {
                 if (i != 0)
@@ -321,39 +364,36 @@ public abstract class PartitionIterator implements Iterator<Row>
                 if (clusteringComponents[i].isEmpty())
                 {
                     int j = i;
-                    while (--j >= 0)
+                    while (true)
                     {
+                        // if we've exhausted the whole partition, we're done
+                        if (--j < 0)
+                            return setHasNext(false);
+
                         clusteringComponents[j].poll();
                         if (!clusteringComponents[j].isEmpty())
                             break;
                     }
 
-                    // if we've exhausted the whole partition, we're done
-                    if (j < 0)
-                        return setHasNext(false);
-
-                    // we don't check here to see if we've exceeded our limit,
-                    // because if we came to a non-existent position and generated a limit
+                    // we don't check here to see if we've exceeded our lastRow,
+                    // because if we came to a non-existent position and generated a lastRow
                     // we want to at least find the next real position, and set it on the seed
                     // in this case we do then yield false and select a different seed to continue with
                     position[j]++;
                     Arrays.fill(position, j + 1, position.length, 0);
                     while (j < i)
                         fill(++j);
-                    incremented = true;
                 }
-                if (clusteringComponents[i].isEmpty())
-                    throw new IllegalStateException();
+
                 row.row[i] = clusteringComponents[i].peek();
             }
 
-            if (incremented && compareToLastRow() > 0)
+            if (compareToLastRow(currentRow.length - 1) > 0)
                 return setHasNext(false);
 
-            position[position.length - 1]--;
             // call advance so we honour any select chance
+            position[position.length - 1]--;
             clusteringComponents[position.length - 1].addFirst(this);
-
             return setHasNext(advance(position.length - 1, true));
         }
 
@@ -384,7 +424,7 @@ public abstract class PartitionIterator implements Iterator<Row>
             ThreadLocalRandom random = ThreadLocalRandom.current();
             // advance the leaf component
             clusteringComponents[depth].poll();
-            position[depth]++;
+            currentRow[depth]++;
             while (true)
             {
                 if (clusteringComponents[depth].isEmpty())
@@ -394,15 +434,18 @@ public abstract class PartitionIterator implements Iterator<Row>
                         return false;
                     depth--;
                     clusteringComponents[depth].poll();
-                    if (++position[depth] > limit[depth])
+                    if (++currentRow[depth] > lastRow[depth])
                         return false;
                     continue;
                 }
 
-                int compareToLastRow = compareToLastRow();
-                if (compareToLastRow > 0 && !first)
+                int compareToLastRow = compareToLastRow(depth);
+                if (compareToLastRow > 0)
+                {
+                    assert !first;
                     return false;
-                boolean forceReturnOne = first && compareToLastRow >= 0;
+                }
+                boolean forceReturnOne = first && compareToLastRow == 0;
 
                 // the chance of descending is the uniform usechance, multiplied by the number of children
                 // we would on average generate (so if we have a 0.1 use chance, but should generate 10 children
@@ -424,7 +467,7 @@ public abstract class PartitionIterator implements Iterator<Row>
                         rollmodifier[depth] = rollmodifier[depth - 1] / Math.min(1d, thischance);
                         chancemodifier[depth] = generator.clusteringDescendantAverages[depth] * rollmodifier[depth];
                     }
-                    position[depth] = 0;
+                    currentRow[depth] = 0;
                     fill(depth);
                     continue;
                 }
@@ -434,32 +477,8 @@ public abstract class PartitionIterator implements Iterator<Row>
 
                 // if we don't descend, we remove the clustering suffix we've skipped and continue
                 clusteringComponents[depth].poll();
-                position[depth]++;
-            }
-        }
-
-        private static int compare(int[] a, int[] b)
-        {
-            for (int i = 0 ; i != a.length ; i++)
-                if (a[i] != b[i])
-                    return Integer.compare(a[i], b[i]);
-            return 0;
-        }
-
-        private int compareToLastRow()
-        {
-            int c = position.length - 1;
-            for (int i = 0 ; i <= c ; i++)
-            {
-                int p = position[i], l = limit[i], r = clusteringComponents[i].size();
-                if (i == c && p == l - 1)
-                    return 0;
-                if ((p < l) & (r > 1))
-                    return -1;
-                if (p > l)
-                    return 1;
+                currentRow[depth]++;
             }
-            return 1;
         }
 
         // generate the clustering components for the provided depth; requires preceding components


[2/2] cassandra git commit: Merge branch 'cassandra-2.1' into trunk

Posted by be...@apache.org.
Merge branch 'cassandra-2.1' into trunk

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/184bb65f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/184bb65f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/184bb65f

Branch: refs/heads/trunk
Commit: 184bb65fcafb8603840fbbbda59665dcda275da3
Parents: 37f1278 1435b9a
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Jan 21 16:52:20 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Jan 21 16:52:20 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../org/apache/cassandra/stress/Operation.java  |   4 +-
 .../stress/generate/PartitionIterator.java      | 219 ++++++++++---------
 3 files changed, 123 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/184bb65f/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 6d60699,0c2bab8..66b17ce
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,58 -1,6 +1,60 @@@
 +3.0
 + * Make CassandraException unchecked, extend RuntimeException (CASSANDRA-8560)
 + * Support direct buffer decompression for reads (CASSANDRA-8464)
 + * DirectByteBuffer compatible LZ4 methods (CASSANDRA-7039)
 + * Add role based access control (CASSANDRA-7653)
 + * Group sstables for anticompaction correctly (CASSANDRA-8578)
 + * Add ReadFailureException to native protocol, respond
 +   immediately when replicas encounter errors while handling
 +   a read request (CASSANDRA-7886)
 + * Switch CommitLogSegment from RandomAccessFile to nio (CASSANDRA-8308)
 + * Allow mixing token and partition key restrictions (CASSANDRA-7016)
 + * Support index key/value entries on map collections (CASSANDRA-8473)
 + * Modernize schema tables (CASSANDRA-8261)
 + * Support for user-defined aggregation functions (CASSANDRA-8053)
 + * Fix NPE in SelectStatement with empty IN values (CASSANDRA-8419)
 + * Refactor SelectStatement, return IN results in natural order instead
 +   of IN value list order (CASSANDRA-7981)
 + * Support UDTs, tuples, and collections in user-defined
 +   functions (CASSANDRA-7563)
 + * Fix aggregate fn results on empty selection, result column name,
 +   and cqlsh parsing (CASSANDRA-8229)
 + * Mark sstables as repaired after full repair (CASSANDRA-7586)
 + * Extend Descriptor to include a format value and refactor reader/writer
 +   APIs (CASSANDRA-7443)
 + * Integrate JMH for microbenchmarks (CASSANDRA-8151)
 + * Keep sstable levels when bootstrapping (CASSANDRA-7460)
 + * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838)
 + * Support for aggregation functions (CASSANDRA-4914)
 + * Remove cassandra-cli (CASSANDRA-7920)
 + * Accept dollar quoted strings in CQL (CASSANDRA-7769)
 + * Make assassinate a first class command (CASSANDRA-7935)
 + * Support IN clause on any clustering column (CASSANDRA-4762)
 + * Improve compaction logging (CASSANDRA-7818)
 + * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
 + * Do anticompaction in groups (CASSANDRA-6851)
 + * Support user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929,
 +   7924, 7812, 8063, 7813, 7708)
 + * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
 + * Move sstable RandomAccessReader to nio2, which allows using the
 +   FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
 + * Remove CQL2 (CASSANDRA-5918)
 + * Add Thrift get_multi_slice call (CASSANDRA-6757)
 + * Optimize fetching multiple cells by name (CASSANDRA-6933)
 + * Allow compilation in java 8 (CASSANDRA-7028)
 + * Make incremental repair default (CASSANDRA-7250)
 + * Enable code coverage thru JaCoCo (CASSANDRA-7226)
 + * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369) 
 + * Shorten SSTable path (CASSANDRA-6962)
 + * Use unsafe mutations for most unit tests (CASSANDRA-6969)
 + * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
 + * Fail on very large batch sizes (CASSANDRA-8011)
 + * Improve concurrency of repair (CASSANDRA-6455, 8208)
 +
 +
  2.1.3
+  * Fix cassandra-stress user-mode truncation of partition generation (CASSANDRA-8608)
+  * Only stream from unrepaired sstables during inc repair (CASSANDRA-8267)
   * Don't allow starting multiple inc repairs on the same sstables (CASSANDRA-8316)
   * Invalidate prepared BATCH statements when related tables
     or keyspaces are dropped (CASSANDRA-8652)