You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/01/21 17:52:32 UTC
[1/2] cassandra git commit: Fix cassandra-stress user-mode truncation
of partition generation
Repository: cassandra
Updated Branches:
refs/heads/trunk 37f127897 -> 184bb65fc
Fix cassandra-stress user-mode truncation of partition generation
patch by benedict; reviewed by tjake for CASSANDRA-8608
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1435b9a8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1435b9a8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1435b9a8
Branch: refs/heads/trunk
Commit: 1435b9a87a4e1878cb35bd6d75e631bf2093e460
Parents: 576a75f
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Jan 21 16:50:55 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Jan 21 16:50:55 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/stress/Operation.java | 4 +-
.../stress/generate/PartitionIterator.java | 219 ++++++++++---------
3 files changed, 122 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1435b9a8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8f71269..0c2bab8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.3
+ * Fix cassandra-stress user-mode truncation of partition generation (CASSANDRA-8608)
* Only stream from unrepaired sstables during inc repair (CASSANDRA-8267)
* Don't allow starting multiple inc repairs on the same sstables (CASSANDRA-8316)
* Invalidate prepared BATCH statements when related tables
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1435b9a8/tools/stress/src/org/apache/cassandra/stress/Operation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Operation.java b/tools/stress/src/org/apache/cassandra/stress/Operation.java
index edf3a54..05045f8 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Operation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Operation.java
@@ -105,9 +105,9 @@ public abstract class Operation
break;
if (spec.useRatio == null)
- success = partitionCache.get(i).reset(seed, spec.targetCount, this);
+ success = partitionCache.get(i).reset(seed, spec.targetCount, isWrite());
else
- success = partitionCache.get(i).reset(seed, spec.useRatio.next(), this);
+ success = partitionCache.get(i).reset(seed, spec.useRatio.next(), isWrite());
}
}
partitionCount = i;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1435b9a8/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
index 0d0cba1..0466edb 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
@@ -50,14 +50,16 @@ import org.apache.cassandra.stress.generate.values.Generator;
public abstract class PartitionIterator implements Iterator<Row>
{
- // we reuse the row object to save garbage
- abstract boolean reset(double useChance, int targetCount, Operation op);
+ abstract boolean reset(double useChance, int targetCount, boolean isWrite);
long idseed;
Seed seed;
- final Object[] partitionKey;
+
final PartitionGenerator generator;
final SeedManager seedManager;
+
+ // we reuse these objects to save garbage
+ final Object[] partitionKey;
final Row row;
public static PartitionIterator get(PartitionGenerator generator, SeedManager seedManager)
@@ -93,16 +95,16 @@ public abstract class PartitionIterator implements Iterator<Row>
this.idseed = idseed;
}
- public boolean reset(Seed seed, double useChance, Operation op)
+ public boolean reset(Seed seed, double useChance, boolean isWrite)
{
setSeed(seed);
- return reset(useChance, 0, op);
+ return reset(useChance, 0, isWrite);
}
- public boolean reset(Seed seed, int targetCount, Operation op)
+ public boolean reset(Seed seed, int targetCount, boolean isWrite)
{
setSeed(seed);
- return reset(Double.NaN, targetCount, op);
+ return reset(Double.NaN, targetCount, isWrite);
}
static class SingleRowIterator extends PartitionIterator
@@ -115,10 +117,10 @@ public abstract class PartitionIterator implements Iterator<Row>
super(generator, seedManager);
}
- boolean reset(double useChance, int targetCount, Operation op)
+ boolean reset(double useChance, int targetCount, boolean isWrite)
{
done = false;
- isWrite = op.isWrite();
+ this.isWrite = isWrite;
return true;
}
@@ -155,24 +157,22 @@ public abstract class PartitionIterator implements Iterator<Row>
// TODO : support first/last row, and constraining reads to rows we know are populated
static class MultiRowIterator extends PartitionIterator
{
-
- // probability any single row will be generated in this iteration
- double useChance;
-
// the seed used to generate the current values for the clustering components at each depth;
// used to save recalculating it for each row, so we only need to recalc from prior row.
final long[] clusteringSeeds = new long[generator.clusteringComponents.size()];
// the components remaining to be visited for each level of the current stack
final Deque<Object>[] clusteringComponents = new ArrayDeque[generator.clusteringComponents.size()];
+ // probability any single row will be generated in this iteration
+ double useChance;
// we want our chance of selection to be applied uniformly, so we compound the roll we make at each level
// so that we know with what chance we reached there, and we adjust our roll at that level by that amount
final double[] chancemodifier = new double[generator.clusteringComponents.size()];
final double[] rollmodifier = new double[generator.clusteringComponents.size()];
// track where in the partition we are, and where we are limited to
- final int[] position = new int[generator.clusteringComponents.size()];
- final int[] limit = new int[position.length];
+ final int[] currentRow = new int[generator.clusteringComponents.size()];
+ final int[] lastRow = new int[currentRow.length];
boolean hasNext, isFirstWrite, isWrite;
// reusable collections for generating unique and sorted clustering components
@@ -188,10 +188,22 @@ public abstract class PartitionIterator implements Iterator<Row>
chancemodifier[0] = generator.clusteringDescendantAverages[0];
}
- // if we're a write, the expected behaviour is that the requested batch count is compounded with the seed's visit
- // count to decide how much we should return in one iteration
- boolean reset(double useChance, int targetCount, Operation op)
+ /**
+ * initialise the iterator state
+ *
+ * if we're a write, the expected behaviour is that the requested
+ * batch count is compounded with the seed's visit count to decide
+ * how much we should return in one iteration
+ *
+ * @param useChance uniform chance of visiting any single row (NaN if targetCount provided)
+ * @param targetCount number of rows we would like to visit (0 if useChance provided)
+ * @param isWrite true if the action requires write semantics
+ *
+ * @return true if there is data to return, false otherwise
+ */
+ boolean reset(double useChance, int targetCount, boolean isWrite)
{
+ this.isWrite = isWrite;
if (this.useChance < 1d)
{
// we clear our prior roll-modifiers if the use chance was previously less-than zero
@@ -207,14 +219,13 @@ public abstract class PartitionIterator implements Iterator<Row>
int expectedRowCount;
int position = seed.position();
- isWrite = op.isWrite();
if (isWrite)
expectedRowCount = firstComponentCount * generator.clusteringDescendantAverages[0];
else if (position != 0)
- expectedRowCount = setLimit(position);
+ expectedRowCount = setLastRow(position - 1);
else
- expectedRowCount = setNoLimit(firstComponentCount);
+ expectedRowCount = setNoLastRow(firstComponentCount);
if (Double.isNaN(useChance))
useChance = Math.max(0d, Math.min(1d, targetCount / (double) expectedRowCount));
@@ -222,38 +233,84 @@ public abstract class PartitionIterator implements Iterator<Row>
while (true)
{
- // TODO: we could avoid repopulating these each loop, by tracking our prior position
+ // we loop in case we have picked an entirely non-existent range, in which case
+ // we will reset the seed's position, then try again (until we exhaust it or find
+ // some real range)
+
for (Queue<?> q : clusteringComponents)
q.clear();
clusteringSeeds[0] = idseed;
fill(clusteringComponents[0], firstComponentCount, generator.clusteringComponents.get(0));
- // we loop in case we have picked an entirely non-existent range, in which case
- // we will reset the seed's position, then try again (until we exhaust it or find
- // some real range) - this only happens for writes, so we only keep this logic in the loop
-
- if (isWrite)
+ if (!isWrite)
{
- position = seed.moveForwards(Math.max(1, expectedRowCount / seed.visits));
- isFirstWrite = position == 0;
+ if (seek(0) != State.SUCCESS)
+ throw new IllegalStateException();
+ return true;
}
+
+ int count = Math.max(1, expectedRowCount / seed.visits);
+ position = seed.moveForwards(count);
+ isFirstWrite = position == 0;
+ setLastRow(position + count - 1);
+
// seek to our start position
- switch (seek(isWrite ? position : 0))
+ switch (seek(position))
{
case END_OF_PARTITION:
return false;
case SUCCESS:
return true;
}
+ }
+ }
- if (!isWrite)
- throw new IllegalStateException();
+ // returns expected row count
+ private int setNoLastRow(int firstComponentCount)
+ {
+ Arrays.fill(lastRow, Integer.MAX_VALUE);
+ return firstComponentCount * generator.clusteringDescendantAverages[0];
+ }
- // TODO: recompose our real position into the nearest scalar position, and ensure the seed position is >= this
+ // sets the last row we will visit
+ // returns expected distance from zero
+ private int setLastRow(int position)
+ {
+ if (position < 0)
+ throw new IllegalStateException();
+
+ decompose(position, lastRow);
+ int expectedRowCount = 0;
+ for (int i = 0 ; i < lastRow.length ; i++)
+ {
+ int l = lastRow[i];
+ expectedRowCount += l * generator.clusteringDescendantAverages[i];
}
+ return expectedRowCount + 1;
}
+ // returns 0 if we are currently on the last row we are allocated to visit; 1 if it is after, -1 if it is before
+ // this is defined by _limit_, which is wired up from expected (mean) row counts
+ // the last row is where position == lastRow, except the last index is 1 less;
+ // OR if that row does not exist, it is the last row prior to it
+ private int compareToLastRow(int depth)
+ {
+ for (int i = 0 ; i <= depth ; i++)
+ {
+ int p = currentRow[i], l = lastRow[i], r = clusteringComponents[i].size();
+ if ((p == l) | (r == 1))
+ continue;
+ return p - l;
+ }
+ return 0;
+ }
+
+ /**
+ * Translate the scalar position into a tiered position based on mean expected counts
+ * @param scalar scalar position
+ * @param decomposed target container
+ */
private void decompose(int scalar, int[] decomposed)
{
for (int i = 0 ; i < decomposed.length ; i++)
@@ -262,7 +319,7 @@ public abstract class PartitionIterator implements Iterator<Row>
decomposed[i] = scalar / avg;
scalar %= avg;
}
- for (int i = limit.length - 1 ; i > 0 ; i--)
+ for (int i = lastRow.length - 1 ; i > 0 ; i--)
{
int avg = generator.clusteringComponentAverages[i];
if (decomposed[i] >= avg)
@@ -273,42 +330,28 @@ public abstract class PartitionIterator implements Iterator<Row>
}
}
- private int setNoLimit(int firstComponentCount)
- {
- Arrays.fill(limit, Integer.MAX_VALUE);
- return firstComponentCount * generator.clusteringDescendantAverages[0];
- }
-
- private int setLimit(int position)
- {
- decompose(position, limit);
- int expectedRowCount = 0;
- for (int i = 0 ; i < limit.length ; i++)
- {
- int l = limit[i];
- expectedRowCount += l * generator.clusteringDescendantAverages[i];
- }
- return expectedRowCount;
- }
-
static enum State
{
END_OF_PARTITION, AFTER_LIMIT, SUCCESS;
}
- // seek to the provided position (or the first entry if null)
+ /**
+ * seek to the provided position to initialise the iterator
+ *
+ * @param scalar scalar position
+ * @return resultant iterator state
+ */
private State seek(int scalar)
{
if (scalar == 0)
{
- this.position[0] = -1;
+ this.currentRow[0] = -1;
clusteringComponents[0].addFirst(this);
return setHasNext(advance(0, true));
}
- int[] position = this.position;
+ int[] position = this.currentRow;
decompose(scalar, position);
- boolean incremented = false;
for (int i = 0 ; i < position.length ; i++)
{
if (i != 0)
@@ -321,39 +364,36 @@ public abstract class PartitionIterator implements Iterator<Row>
if (clusteringComponents[i].isEmpty())
{
int j = i;
- while (--j >= 0)
+ while (true)
{
+ // if we've exhausted the whole partition, we're done
+ if (--j < 0)
+ return setHasNext(false);
+
clusteringComponents[j].poll();
if (!clusteringComponents[j].isEmpty())
break;
}
- // if we've exhausted the whole partition, we're done
- if (j < 0)
- return setHasNext(false);
-
- // we don't check here to see if we've exceeded our limit,
- // because if we came to a non-existent position and generated a limit
+ // we don't check here to see if we've exceeded our lastRow,
+ // because if we came to a non-existent position and generated a lastRow
// we want to at least find the next real position, and set it on the seed
// in this case we do then yield false and select a different seed to continue with
position[j]++;
Arrays.fill(position, j + 1, position.length, 0);
while (j < i)
fill(++j);
- incremented = true;
}
- if (clusteringComponents[i].isEmpty())
- throw new IllegalStateException();
+
row.row[i] = clusteringComponents[i].peek();
}
- if (incremented && compareToLastRow() > 0)
+ if (compareToLastRow(currentRow.length - 1) > 0)
return setHasNext(false);
- position[position.length - 1]--;
// call advance so we honour any select chance
+ position[position.length - 1]--;
clusteringComponents[position.length - 1].addFirst(this);
-
return setHasNext(advance(position.length - 1, true));
}
@@ -384,7 +424,7 @@ public abstract class PartitionIterator implements Iterator<Row>
ThreadLocalRandom random = ThreadLocalRandom.current();
// advance the leaf component
clusteringComponents[depth].poll();
- position[depth]++;
+ currentRow[depth]++;
while (true)
{
if (clusteringComponents[depth].isEmpty())
@@ -394,15 +434,18 @@ public abstract class PartitionIterator implements Iterator<Row>
return false;
depth--;
clusteringComponents[depth].poll();
- if (++position[depth] > limit[depth])
+ if (++currentRow[depth] > lastRow[depth])
return false;
continue;
}
- int compareToLastRow = compareToLastRow();
- if (compareToLastRow > 0 && !first)
+ int compareToLastRow = compareToLastRow(depth);
+ if (compareToLastRow > 0)
+ {
+ assert !first;
return false;
- boolean forceReturnOne = first && compareToLastRow >= 0;
+ }
+ boolean forceReturnOne = first && compareToLastRow == 0;
// the chance of descending is the uniform usechance, multiplied by the number of children
// we would on average generate (so if we have a 0.1 use chance, but should generate 10 children
@@ -424,7 +467,7 @@ public abstract class PartitionIterator implements Iterator<Row>
rollmodifier[depth] = rollmodifier[depth - 1] / Math.min(1d, thischance);
chancemodifier[depth] = generator.clusteringDescendantAverages[depth] * rollmodifier[depth];
}
- position[depth] = 0;
+ currentRow[depth] = 0;
fill(depth);
continue;
}
@@ -434,32 +477,8 @@ public abstract class PartitionIterator implements Iterator<Row>
// if we don't descend, we remove the clustering suffix we've skipped and continue
clusteringComponents[depth].poll();
- position[depth]++;
- }
- }
-
- private static int compare(int[] a, int[] b)
- {
- for (int i = 0 ; i != a.length ; i++)
- if (a[i] != b[i])
- return Integer.compare(a[i], b[i]);
- return 0;
- }
-
- private int compareToLastRow()
- {
- int c = position.length - 1;
- for (int i = 0 ; i <= c ; i++)
- {
- int p = position[i], l = limit[i], r = clusteringComponents[i].size();
- if (i == c && p == l - 1)
- return 0;
- if ((p < l) & (r > 1))
- return -1;
- if (p > l)
- return 1;
+ currentRow[depth]++;
}
- return 1;
}
// generate the clustering components for the provided depth; requires preceding components
[2/2] cassandra git commit: Merge branch 'cassandra-2.1' into trunk
Posted by be...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/184bb65f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/184bb65f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/184bb65f
Branch: refs/heads/trunk
Commit: 184bb65fcafb8603840fbbbda59665dcda275da3
Parents: 37f1278 1435b9a
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Jan 21 16:52:20 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Jan 21 16:52:20 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../org/apache/cassandra/stress/Operation.java | 4 +-
.../stress/generate/PartitionIterator.java | 219 ++++++++++---------
3 files changed, 123 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/184bb65f/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 6d60699,0c2bab8..66b17ce
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,58 -1,6 +1,60 @@@
+3.0
+ * Make CassandraException unchecked, extend RuntimeException (CASSANDRA-8560)
+ * Support direct buffer decompression for reads (CASSANDRA-8464)
+ * DirectByteBuffer compatible LZ4 methods (CASSANDRA-7039)
+ * Add role based access control (CASSANDRA-7653)
+ * Group sstables for anticompaction correctly (CASSANDRA-8578)
+ * Add ReadFailureException to native protocol, respond
+ immediately when replicas encounter errors while handling
+ a read request (CASSANDRA-7886)
+ * Switch CommitLogSegment from RandomAccessFile to nio (CASSANDRA-8308)
+ * Allow mixing token and partition key restrictions (CASSANDRA-7016)
+ * Support index key/value entries on map collections (CASSANDRA-8473)
+ * Modernize schema tables (CASSANDRA-8261)
+ * Support for user-defined aggregation functions (CASSANDRA-8053)
+ * Fix NPE in SelectStatement with empty IN values (CASSANDRA-8419)
+ * Refactor SelectStatement, return IN results in natural order instead
+ of IN value list order (CASSANDRA-7981)
+ * Support UDTs, tuples, and collections in user-defined
+ functions (CASSANDRA-7563)
+ * Fix aggregate fn results on empty selection, result column name,
+ and cqlsh parsing (CASSANDRA-8229)
+ * Mark sstables as repaired after full repair (CASSANDRA-7586)
+ * Extend Descriptor to include a format value and refactor reader/writer
+ APIs (CASSANDRA-7443)
+ * Integrate JMH for microbenchmarks (CASSANDRA-8151)
+ * Keep sstable levels when bootstrapping (CASSANDRA-7460)
+ * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838)
+ * Support for aggregation functions (CASSANDRA-4914)
+ * Remove cassandra-cli (CASSANDRA-7920)
+ * Accept dollar quoted strings in CQL (CASSANDRA-7769)
+ * Make assassinate a first class command (CASSANDRA-7935)
+ * Support IN clause on any clustering column (CASSANDRA-4762)
+ * Improve compaction logging (CASSANDRA-7818)
+ * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
+ * Do anticompaction in groups (CASSANDRA-6851)
+ * Support user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929,
+ 7924, 7812, 8063, 7813, 7708)
+ * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
+ * Move sstable RandomAccessReader to nio2, which allows using the
+ FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
+ * Remove CQL2 (CASSANDRA-5918)
+ * Add Thrift get_multi_slice call (CASSANDRA-6757)
+ * Optimize fetching multiple cells by name (CASSANDRA-6933)
+ * Allow compilation in java 8 (CASSANDRA-7028)
+ * Make incremental repair default (CASSANDRA-7250)
+ * Enable code coverage thru JaCoCo (CASSANDRA-7226)
+ * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369)
+ * Shorten SSTable path (CASSANDRA-6962)
+ * Use unsafe mutations for most unit tests (CASSANDRA-6969)
+ * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
+ * Fail on very large batch sizes (CASSANDRA-8011)
+ * Improve concurrency of repair (CASSANDRA-6455, 8208)
+
+
2.1.3
+ * Fix cassandra-stress user-mode truncation of partition generation (CASSANDRA-8608)
+ * Only stream from unrepaired sstables during inc repair (CASSANDRA-8267)
* Don't allow starting multiple inc repairs on the same sstables (CASSANDRA-8316)
* Invalidate prepared BATCH statements when related tables
or keyspaces are dropped (CASSANDRA-8652)