You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2015/05/28 19:35:55 UTC
[1/2] cassandra git commit: Revert refactor of doValidationCompaction
which caused breaks from 9431
Repository: cassandra
Updated Branches:
refs/heads/trunk 74b0b8acc -> 64f1eadd5
Revert refactor of doValidationCompaction which caused breaks from 9431
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7d067621
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7d067621
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7d067621
Branch: refs/heads/trunk
Commit: 7d0676215ea1f834f9475f18f5853d80b261e29d
Parents: 1de8e39
Author: T Jake Luciani <ja...@apache.org>
Authored: Thu May 28 13:33:41 2015 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Thu May 28 13:34:03 2015 -0400
----------------------------------------------------------------------
.../db/compaction/CompactionManager.java | 146 ++++++++++---------
1 file changed, 74 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d067621/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index ffed554..26dab7c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1013,6 +1013,7 @@ public class CompactionManager implements CompactionManagerMBean
* Performs a readonly "compaction" of all sstables in order to validate complete rows,
* but without writing the merge result
*/
+ @SuppressWarnings("resource")
private void doValidationCompaction(ColumnFamilyStore cfs, Validator validator) throws IOException
{
// this isn't meant to be race-proof, because it's not -- it won't cause bugs for a CFS to be dropped
@@ -1023,40 +1024,35 @@ public class CompactionManager implements CompactionManagerMBean
if (!cfs.isValid())
return;
- String snapshotName = validator.desc.sessionId.toString();
- boolean isSnapshotValidation = cfs.snapshotExists(snapshotName);
-
- int gcBefore;
-
- if (isSnapshotValidation)
+ Refs<SSTableReader> sstables = null;
+ try
{
- // If there is a snapshot created for the session then read from there.
- // note that we populate the parent repair session when creating the snapshot, meaning the sstables in the snapshot are the ones we
- // are supposed to validate.
- try (Refs<SSTableReader> sstables = cfs.getSnapshotSSTableReader(snapshotName))
+ String snapshotName = validator.desc.sessionId.toString();
+ int gcBefore;
+ boolean isSnapshotValidation = cfs.snapshotExists(snapshotName);
+ if (isSnapshotValidation)
{
+ // If there is a snapshot created for the session then read from there.
+ // note that we populate the parent repair session when creating the snapshot, meaning the sstables in the snapshot are the ones we
+ // are supposed to validate.
+ sstables = cfs.getSnapshotSSTableReader(snapshotName);
+
+
// Computing gcbefore based on the current time wouldn't be very good because we know each replica will execute
// this at a different time (that's the whole purpose of repair with snaphsot). So instead we take the creation
// time of the snapshot, which should give us roughtly the same time on each replica (roughtly being in that case
// 'as good as in the non-snapshot' case)
gcBefore = cfs.gcBefore(cfs.getSnapshotCreationTime(snapshotName));
-
- buildMerkleTree(cfs, sstables, validator, gcBefore);
-
- // review comment: should this be in a try/finally? it was previously
- cfs.clearSnapshot(snapshotName);
}
- }
- else
- {
- // flush first so everyone is validating data that is as similar as possible
- StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
- ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId);
- try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(prs.isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES))
+ else
{
- Refs<SSTableReader> refs = sstableCandidates.refs;
+ // flush first so everyone is validating data that is as similar as possible
+ StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
+ ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId);
+ ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(prs.isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES);
Set<SSTableReader> sstablesToValidate = new HashSet<>();
+
for (SSTableReader sstable : sstableCandidates.sstables)
{
if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singletonList(validator.desc.range)))
@@ -1073,72 +1069,78 @@ public class CompactionManager implements CompactionManagerMBean
throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
}
- refs.relaseAllExcept(sstablesToValidate);
+ sstables = Refs.tryRef(sstablesToValidate);
+ if (sstables == null)
+ {
+ logger.error("Could not reference sstables");
+ throw new RuntimeException("Could not reference sstables");
+ }
+ sstableCandidates.release();
prs.addSSTables(cfs.metadata.cfId, sstablesToValidate);
if (validator.gcBefore > 0)
gcBefore = validator.gcBefore;
else
gcBefore = getDefaultGcBefore(cfs);
-
-
- buildMerkleTree(cfs, refs, validator, gcBefore);
}
- }
- }
- private void buildMerkleTree(ColumnFamilyStore cfs, Refs<SSTableReader> sstables, Validator validator, int gcBefore)
- {
- // Create Merkle tree suitable to hold estimated partitions for given range.
- // We blindly assume that partition is evenly distributed on all sstables for now.
- long numPartitions = 0;
- for (SSTableReader sstable : sstables)
- {
- numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(validator.desc.range));
- }
- // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
- int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
- MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
+ // Create Merkle tree suitable to hold estimated partitions for given range.
+ // We blindly assume that partition is evenly distributed on all sstables for now.
+ long numPartitions = 0;
+ for (SSTableReader sstable : sstables)
+ {
+ numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range));
+ }
+ // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
+ int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
+ MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
- long start = System.nanoTime();
- try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables, validator.desc.range))
- {
- CompactionIterable ci = new ValidationCompactionIterable(cfs, scanners.scanners, gcBefore);
- metrics.beginCompaction(ci);
- try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator();)
+ long start = System.nanoTime();
+ try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables, validator.desc.range))
{
- // validate the CF as we iterate over it
- validator.prepare(cfs, tree);
- while (iter.hasNext())
+ CompactionIterable ci = new ValidationCompactionIterable(cfs, scanners.scanners, gcBefore);
+ Iterator<AbstractCompactedRow> iter = ci.iterator();
+ metrics.beginCompaction(ci);
+ try
{
- if (ci.isStopRequested())
- throw new CompactionInterruptedException(ci.getCompactionInfo());
- @SuppressWarnings("resource")
- AbstractCompactedRow row = iter.next();
- validator.add(row);
+ // validate the CF as we iterate over it
+ validator.prepare(cfs, tree);
+ while (iter.hasNext())
+ {
+ if (ci.isStopRequested())
+ throw new CompactionInterruptedException(ci.getCompactionInfo());
+ AbstractCompactedRow row = iter.next();
+ validator.add(row);
+ }
+ validator.complete();
+ }
+ finally
+ {
+ if (isSnapshotValidation)
+ {
+ cfs.clearSnapshot(snapshotName);
+ }
+
+ metrics.finishCompaction(ci);
}
- validator.complete();
- }
- catch (Exception e)
- {
- Throwables.propagate(e);
}
- finally
+
+ if (logger.isDebugEnabled())
{
- metrics.finishCompaction(ci);
+ // MT serialize may take time
+ long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ logger.debug("Validation finished in {} msec, depth {} for {} keys, serialized size {} bytes for {}",
+ duration,
+ depth,
+ numPartitions,
+ MerkleTree.serializer.serializedSize(tree, 0),
+ validator.desc);
}
}
-
- if (logger.isDebugEnabled())
+ finally
{
- // MT serialize may take time
- long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
- logger.debug("Validation finished in {} msec, depth {} for {} keys, serialized size {} bytes for {}",
- duration,
- depth,
- numPartitions,
- MerkleTree.serializer.serializedSize(tree, 0),
- validator.desc);
+ if (sstables != null)
+ sstables.release();
}
}
[2/2] cassandra git commit: Merge branch 'cassandra-2.2' into trunk
Posted by ja...@apache.org.
Merge branch 'cassandra-2.2' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/64f1eadd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/64f1eadd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/64f1eadd
Branch: refs/heads/trunk
Commit: 64f1eadd5de8d2829087d7730516f9406f139c7a
Parents: 74b0b8a 7d06762
Author: T Jake Luciani <ja...@apache.org>
Authored: Thu May 28 13:35:34 2015 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Thu May 28 13:35:34 2015 -0400
----------------------------------------------------------------------
.../db/compaction/CompactionManager.java | 146 ++++++++++---------
1 file changed, 74 insertions(+), 72 deletions(-)
----------------------------------------------------------------------