You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/01/21 17:51:55 UTC
cassandra git commit: Fix cassandra-stress user-mode truncation of
partition generation
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 576a75f28 -> 1435b9a87
Fix cassandra-stress user-mode truncation of partition generation
patch by benedict; reviewed by tjake for CASSANDRA-8608
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1435b9a8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1435b9a8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1435b9a8
Branch: refs/heads/cassandra-2.1
Commit: 1435b9a87a4e1878cb35bd6d75e631bf2093e460
Parents: 576a75f
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Jan 21 16:50:55 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Jan 21 16:50:55 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/stress/Operation.java | 4 +-
.../stress/generate/PartitionIterator.java | 219 ++++++++++---------
3 files changed, 122 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1435b9a8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8f71269..0c2bab8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.3
+ * Fix cassandra-stress user-mode truncation of partition generation (CASSANDRA-8608)
* Only stream from unrepaired sstables during inc repair (CASSANDRA-8267)
* Don't allow starting multiple inc repairs on the same sstables (CASSANDRA-8316)
* Invalidate prepared BATCH statements when related tables
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1435b9a8/tools/stress/src/org/apache/cassandra/stress/Operation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Operation.java b/tools/stress/src/org/apache/cassandra/stress/Operation.java
index edf3a54..05045f8 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Operation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Operation.java
@@ -105,9 +105,9 @@ public abstract class Operation
break;
if (spec.useRatio == null)
- success = partitionCache.get(i).reset(seed, spec.targetCount, this);
+ success = partitionCache.get(i).reset(seed, spec.targetCount, isWrite());
else
- success = partitionCache.get(i).reset(seed, spec.useRatio.next(), this);
+ success = partitionCache.get(i).reset(seed, spec.useRatio.next(), isWrite());
}
}
partitionCount = i;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1435b9a8/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
index 0d0cba1..0466edb 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
@@ -50,14 +50,16 @@ import org.apache.cassandra.stress.generate.values.Generator;
public abstract class PartitionIterator implements Iterator<Row>
{
- // we reuse the row object to save garbage
- abstract boolean reset(double useChance, int targetCount, Operation op);
+ abstract boolean reset(double useChance, int targetCount, boolean isWrite);
long idseed;
Seed seed;
- final Object[] partitionKey;
+
final PartitionGenerator generator;
final SeedManager seedManager;
+
+ // we reuse these objects to save garbage
+ final Object[] partitionKey;
final Row row;
public static PartitionIterator get(PartitionGenerator generator, SeedManager seedManager)
@@ -93,16 +95,16 @@ public abstract class PartitionIterator implements Iterator<Row>
this.idseed = idseed;
}
- public boolean reset(Seed seed, double useChance, Operation op)
+ public boolean reset(Seed seed, double useChance, boolean isWrite)
{
setSeed(seed);
- return reset(useChance, 0, op);
+ return reset(useChance, 0, isWrite);
}
- public boolean reset(Seed seed, int targetCount, Operation op)
+ public boolean reset(Seed seed, int targetCount, boolean isWrite)
{
setSeed(seed);
- return reset(Double.NaN, targetCount, op);
+ return reset(Double.NaN, targetCount, isWrite);
}
static class SingleRowIterator extends PartitionIterator
@@ -115,10 +117,10 @@ public abstract class PartitionIterator implements Iterator<Row>
super(generator, seedManager);
}
- boolean reset(double useChance, int targetCount, Operation op)
+ boolean reset(double useChance, int targetCount, boolean isWrite)
{
done = false;
- isWrite = op.isWrite();
+ this.isWrite = isWrite;
return true;
}
@@ -155,24 +157,22 @@ public abstract class PartitionIterator implements Iterator<Row>
// TODO : support first/last row, and constraining reads to rows we know are populated
static class MultiRowIterator extends PartitionIterator
{
-
- // probability any single row will be generated in this iteration
- double useChance;
-
// the seed used to generate the current values for the clustering components at each depth;
// used to save recalculating it for each row, so we only need to recalc from prior row.
final long[] clusteringSeeds = new long[generator.clusteringComponents.size()];
// the components remaining to be visited for each level of the current stack
final Deque<Object>[] clusteringComponents = new ArrayDeque[generator.clusteringComponents.size()];
+ // probability any single row will be generated in this iteration
+ double useChance;
// we want our chance of selection to be applied uniformly, so we compound the roll we make at each level
// so that we know with what chance we reached there, and we adjust our roll at that level by that amount
final double[] chancemodifier = new double[generator.clusteringComponents.size()];
final double[] rollmodifier = new double[generator.clusteringComponents.size()];
// track where in the partition we are, and where we are limited to
- final int[] position = new int[generator.clusteringComponents.size()];
- final int[] limit = new int[position.length];
+ final int[] currentRow = new int[generator.clusteringComponents.size()];
+ final int[] lastRow = new int[currentRow.length];
boolean hasNext, isFirstWrite, isWrite;
// reusable collections for generating unique and sorted clustering components
@@ -188,10 +188,22 @@ public abstract class PartitionIterator implements Iterator<Row>
chancemodifier[0] = generator.clusteringDescendantAverages[0];
}
- // if we're a write, the expected behaviour is that the requested batch count is compounded with the seed's visit
- // count to decide how much we should return in one iteration
- boolean reset(double useChance, int targetCount, Operation op)
+ /**
+ * initialise the iterator state
+ *
+ * if we're a write, the expected behaviour is that the requested
+ * batch count is compounded with the seed's visit count to decide
+ * how much we should return in one iteration
+ *
+ * @param useChance uniform chance of visiting any single row (NaN if targetCount provided)
+ * @param targetCount number of rows we would like to visit (0 if useChance provided)
+ * @param isWrite true if the action requires write semantics
+ *
+ * @return true if there is data to return, false otherwise
+ */
+ boolean reset(double useChance, int targetCount, boolean isWrite)
{
+ this.isWrite = isWrite;
if (this.useChance < 1d)
{
// we clear our prior roll-modifiers if the use chance was previously less-than zero
@@ -207,14 +219,13 @@ public abstract class PartitionIterator implements Iterator<Row>
int expectedRowCount;
int position = seed.position();
- isWrite = op.isWrite();
if (isWrite)
expectedRowCount = firstComponentCount * generator.clusteringDescendantAverages[0];
else if (position != 0)
- expectedRowCount = setLimit(position);
+ expectedRowCount = setLastRow(position - 1);
else
- expectedRowCount = setNoLimit(firstComponentCount);
+ expectedRowCount = setNoLastRow(firstComponentCount);
if (Double.isNaN(useChance))
useChance = Math.max(0d, Math.min(1d, targetCount / (double) expectedRowCount));
@@ -222,38 +233,84 @@ public abstract class PartitionIterator implements Iterator<Row>
while (true)
{
- // TODO: we could avoid repopulating these each loop, by tracking our prior position
+ // we loop in case we have picked an entirely non-existent range, in which case
+ // we will reset the seed's position, then try again (until we exhaust it or find
+ // some real range)
+
for (Queue<?> q : clusteringComponents)
q.clear();
clusteringSeeds[0] = idseed;
fill(clusteringComponents[0], firstComponentCount, generator.clusteringComponents.get(0));
- // we loop in case we have picked an entirely non-existent range, in which case
- // we will reset the seed's position, then try again (until we exhaust it or find
- // some real range) - this only happens for writes, so we only keep this logic in the loop
-
- if (isWrite)
+ if (!isWrite)
{
- position = seed.moveForwards(Math.max(1, expectedRowCount / seed.visits));
- isFirstWrite = position == 0;
+ if (seek(0) != State.SUCCESS)
+ throw new IllegalStateException();
+ return true;
}
+
+ int count = Math.max(1, expectedRowCount / seed.visits);
+ position = seed.moveForwards(count);
+ isFirstWrite = position == 0;
+ setLastRow(position + count - 1);
+
// seek to our start position
- switch (seek(isWrite ? position : 0))
+ switch (seek(position))
{
case END_OF_PARTITION:
return false;
case SUCCESS:
return true;
}
+ }
+ }
- if (!isWrite)
- throw new IllegalStateException();
+ // returns expected row count
+ private int setNoLastRow(int firstComponentCount)
+ {
+ Arrays.fill(lastRow, Integer.MAX_VALUE);
+ return firstComponentCount * generator.clusteringDescendantAverages[0];
+ }
- // TODO: recompose our real position into the nearest scalar position, and ensure the seed position is >= this
+ // sets the last row we will visit
+ // returns expected distance from zero
+ private int setLastRow(int position)
+ {
+ if (position < 0)
+ throw new IllegalStateException();
+
+ decompose(position, lastRow);
+ int expectedRowCount = 0;
+ for (int i = 0 ; i < lastRow.length ; i++)
+ {
+ int l = lastRow[i];
+ expectedRowCount += l * generator.clusteringDescendantAverages[i];
}
+ return expectedRowCount + 1;
}
+ // returns 0 if we are currently on the last row we are allocated to visit; 1 if it is after, -1 if it is before
+ // this is defined by _limit_, which is wired up from expected (mean) row counts
+ // the last row is where position == lastRow, except the last index is 1 less;
+ // OR if that row does not exist, it is the last row prior to it
+ private int compareToLastRow(int depth)
+ {
+ for (int i = 0 ; i <= depth ; i++)
+ {
+ int p = currentRow[i], l = lastRow[i], r = clusteringComponents[i].size();
+ if ((p == l) | (r == 1))
+ continue;
+ return p - l;
+ }
+ return 0;
+ }
+
+ /**
+ * Translate the scalar position into a tiered position based on mean expected counts
+ * @param scalar scalar position
+ * @param decomposed target container
+ */
private void decompose(int scalar, int[] decomposed)
{
for (int i = 0 ; i < decomposed.length ; i++)
@@ -262,7 +319,7 @@ public abstract class PartitionIterator implements Iterator<Row>
decomposed[i] = scalar / avg;
scalar %= avg;
}
- for (int i = limit.length - 1 ; i > 0 ; i--)
+ for (int i = lastRow.length - 1 ; i > 0 ; i--)
{
int avg = generator.clusteringComponentAverages[i];
if (decomposed[i] >= avg)
@@ -273,42 +330,28 @@ public abstract class PartitionIterator implements Iterator<Row>
}
}
- private int setNoLimit(int firstComponentCount)
- {
- Arrays.fill(limit, Integer.MAX_VALUE);
- return firstComponentCount * generator.clusteringDescendantAverages[0];
- }
-
- private int setLimit(int position)
- {
- decompose(position, limit);
- int expectedRowCount = 0;
- for (int i = 0 ; i < limit.length ; i++)
- {
- int l = limit[i];
- expectedRowCount += l * generator.clusteringDescendantAverages[i];
- }
- return expectedRowCount;
- }
-
static enum State
{
END_OF_PARTITION, AFTER_LIMIT, SUCCESS;
}
- // seek to the provided position (or the first entry if null)
+ /**
+ * seek to the provided position to initialise the iterator
+ *
+ * @param scalar scalar position
+ * @return resultant iterator state
+ */
private State seek(int scalar)
{
if (scalar == 0)
{
- this.position[0] = -1;
+ this.currentRow[0] = -1;
clusteringComponents[0].addFirst(this);
return setHasNext(advance(0, true));
}
- int[] position = this.position;
+ int[] position = this.currentRow;
decompose(scalar, position);
- boolean incremented = false;
for (int i = 0 ; i < position.length ; i++)
{
if (i != 0)
@@ -321,39 +364,36 @@ public abstract class PartitionIterator implements Iterator<Row>
if (clusteringComponents[i].isEmpty())
{
int j = i;
- while (--j >= 0)
+ while (true)
{
+ // if we've exhausted the whole partition, we're done
+ if (--j < 0)
+ return setHasNext(false);
+
clusteringComponents[j].poll();
if (!clusteringComponents[j].isEmpty())
break;
}
- // if we've exhausted the whole partition, we're done
- if (j < 0)
- return setHasNext(false);
-
- // we don't check here to see if we've exceeded our limit,
- // because if we came to a non-existent position and generated a limit
+ // we don't check here to see if we've exceeded our lastRow,
+ // because if we came to a non-existent position and generated a lastRow
// we want to at least find the next real position, and set it on the seed
// in this case we do then yield false and select a different seed to continue with
position[j]++;
Arrays.fill(position, j + 1, position.length, 0);
while (j < i)
fill(++j);
- incremented = true;
}
- if (clusteringComponents[i].isEmpty())
- throw new IllegalStateException();
+
row.row[i] = clusteringComponents[i].peek();
}
- if (incremented && compareToLastRow() > 0)
+ if (compareToLastRow(currentRow.length - 1) > 0)
return setHasNext(false);
- position[position.length - 1]--;
// call advance so we honour any select chance
+ position[position.length - 1]--;
clusteringComponents[position.length - 1].addFirst(this);
-
return setHasNext(advance(position.length - 1, true));
}
@@ -384,7 +424,7 @@ public abstract class PartitionIterator implements Iterator<Row>
ThreadLocalRandom random = ThreadLocalRandom.current();
// advance the leaf component
clusteringComponents[depth].poll();
- position[depth]++;
+ currentRow[depth]++;
while (true)
{
if (clusteringComponents[depth].isEmpty())
@@ -394,15 +434,18 @@ public abstract class PartitionIterator implements Iterator<Row>
return false;
depth--;
clusteringComponents[depth].poll();
- if (++position[depth] > limit[depth])
+ if (++currentRow[depth] > lastRow[depth])
return false;
continue;
}
- int compareToLastRow = compareToLastRow();
- if (compareToLastRow > 0 && !first)
+ int compareToLastRow = compareToLastRow(depth);
+ if (compareToLastRow > 0)
+ {
+ assert !first;
return false;
- boolean forceReturnOne = first && compareToLastRow >= 0;
+ }
+ boolean forceReturnOne = first && compareToLastRow == 0;
// the chance of descending is the uniform usechance, multiplied by the number of children
// we would on average generate (so if we have a 0.1 use chance, but should generate 10 children
@@ -424,7 +467,7 @@ public abstract class PartitionIterator implements Iterator<Row>
rollmodifier[depth] = rollmodifier[depth - 1] / Math.min(1d, thischance);
chancemodifier[depth] = generator.clusteringDescendantAverages[depth] * rollmodifier[depth];
}
- position[depth] = 0;
+ currentRow[depth] = 0;
fill(depth);
continue;
}
@@ -434,32 +477,8 @@ public abstract class PartitionIterator implements Iterator<Row>
// if we don't descend, we remove the clustering suffix we've skipped and continue
clusteringComponents[depth].poll();
- position[depth]++;
- }
- }
-
- private static int compare(int[] a, int[] b)
- {
- for (int i = 0 ; i != a.length ; i++)
- if (a[i] != b[i])
- return Integer.compare(a[i], b[i]);
- return 0;
- }
-
- private int compareToLastRow()
- {
- int c = position.length - 1;
- for (int i = 0 ; i <= c ; i++)
- {
- int p = position[i], l = limit[i], r = clusteringComponents[i].size();
- if (i == c && p == l - 1)
- return 0;
- if ((p < l) & (r > 1))
- return -1;
- if (p > l)
- return 1;
+ currentRow[depth]++;
}
- return 1;
}
// generate the clustering components for the provided depth; requires preceding components