You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2015/05/28 19:34:31 UTC

cassandra git commit: Revert refactor of doValidationCompaction which caused breaks from 9431

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 1de8e39ed -> 7d0676215


Revert refactor of doValidationCompaction which caused breaks from 9431


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7d067621
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7d067621
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7d067621

Branch: refs/heads/cassandra-2.2
Commit: 7d0676215ea1f834f9475f18f5853d80b261e29d
Parents: 1de8e39
Author: T Jake Luciani <ja...@apache.org>
Authored: Thu May 28 13:33:41 2015 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Thu May 28 13:34:03 2015 -0400

----------------------------------------------------------------------
 .../db/compaction/CompactionManager.java        | 146 ++++++++++---------
 1 file changed, 74 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d067621/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index ffed554..26dab7c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1013,6 +1013,7 @@ public class CompactionManager implements CompactionManagerMBean
      * Performs a readonly "compaction" of all sstables in order to validate complete rows,
      * but without writing the merge result
      */
+    @SuppressWarnings("resource")
     private void doValidationCompaction(ColumnFamilyStore cfs, Validator validator) throws IOException
     {
         // this isn't meant to be race-proof, because it's not -- it won't cause bugs for a CFS to be dropped
@@ -1023,40 +1024,35 @@ public class CompactionManager implements CompactionManagerMBean
         if (!cfs.isValid())
             return;
 
-        String snapshotName = validator.desc.sessionId.toString();
-        boolean isSnapshotValidation = cfs.snapshotExists(snapshotName);
-
-        int gcBefore;
-
-        if (isSnapshotValidation)
+        Refs<SSTableReader> sstables = null;
+        try
         {
-            // If there is a snapshot created for the session then read from there.
-            // note that we populate the parent repair session when creating the snapshot, meaning the sstables in the snapshot are the ones we
-            // are supposed to validate.
 
-            try (Refs<SSTableReader> sstables = cfs.getSnapshotSSTableReader(snapshotName))
+            String snapshotName = validator.desc.sessionId.toString();
+            int gcBefore;
+            boolean isSnapshotValidation = cfs.snapshotExists(snapshotName);
+            if (isSnapshotValidation)
             {
+                // If there is a snapshot created for the session then read from there.
+                // note that we populate the parent repair session when creating the snapshot, meaning the sstables in the snapshot are the ones we
+                // are supposed to validate.
+                sstables = cfs.getSnapshotSSTableReader(snapshotName);
+
+
                 // Computing gcbefore based on the current time wouldn't be very good because we know each replica will execute
                 // this at a different time (that's the whole purpose of repair with snaphsot). So instead we take the creation
                 // time of the snapshot, which should give us roughtly the same time on each replica (roughtly being in that case
                 // 'as good as in the non-snapshot' case)
                 gcBefore = cfs.gcBefore(cfs.getSnapshotCreationTime(snapshotName));
-
-                buildMerkleTree(cfs, sstables, validator, gcBefore);
-
-                // review comment: should this be in a try/finally? it was previously
-                cfs.clearSnapshot(snapshotName);
             }
-        }
-        else
-        {
-            // flush first so everyone is validating data that is as similar as possible
-            StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
-            ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId);
-            try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(prs.isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES))
+            else
             {
-                Refs<SSTableReader> refs = sstableCandidates.refs;
+                // flush first so everyone is validating data that is as similar as possible
+                StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
+                ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId);
+                ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(prs.isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES);
                 Set<SSTableReader> sstablesToValidate = new HashSet<>();
+
                 for (SSTableReader sstable : sstableCandidates.sstables)
                 {
                     if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singletonList(validator.desc.range)))
@@ -1073,72 +1069,78 @@ public class CompactionManager implements CompactionManagerMBean
                     throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
                 }
 
-                refs.relaseAllExcept(sstablesToValidate);
+                sstables = Refs.tryRef(sstablesToValidate);
+                if (sstables == null)
+                {
+                    logger.error("Could not reference sstables");
+                    throw new RuntimeException("Could not reference sstables");
+                }
+                sstableCandidates.release();
                 prs.addSSTables(cfs.metadata.cfId, sstablesToValidate);
 
                 if (validator.gcBefore > 0)
                     gcBefore = validator.gcBefore;
                 else
                     gcBefore = getDefaultGcBefore(cfs);
-
-
-                buildMerkleTree(cfs, refs, validator, gcBefore);
             }
-        }
-    }
 
-    private void buildMerkleTree(ColumnFamilyStore cfs, Refs<SSTableReader> sstables, Validator validator, int gcBefore)
-    {
-        // Create Merkle tree suitable to hold estimated partitions for given range.
-        // We blindly assume that partition is evenly distributed on all sstables for now.
-        long numPartitions = 0;
-        for (SSTableReader sstable : sstables)
-        {
-            numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(validator.desc.range));
-        }
-        // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
-        int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
-        MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
+            // Create Merkle tree suitable to hold estimated partitions for given range.
+            // We blindly assume that partition is evenly distributed on all sstables for now.
+            long numPartitions = 0;
+            for (SSTableReader sstable : sstables)
+            {
+                numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range));
+            }
+            // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
+            int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
+            MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
 
-        long start = System.nanoTime();
-        try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables, validator.desc.range))
-        {
-            CompactionIterable ci = new ValidationCompactionIterable(cfs, scanners.scanners, gcBefore);
-            metrics.beginCompaction(ci);
-            try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator();)
+            long start = System.nanoTime();
+            try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables, validator.desc.range))
             {
-                // validate the CF as we iterate over it
-                validator.prepare(cfs, tree);
-                while (iter.hasNext())
+                CompactionIterable ci = new ValidationCompactionIterable(cfs, scanners.scanners, gcBefore);
+                Iterator<AbstractCompactedRow> iter = ci.iterator();
+                metrics.beginCompaction(ci);
+                try
                 {
-                    if (ci.isStopRequested())
-                        throw new CompactionInterruptedException(ci.getCompactionInfo());
-                    @SuppressWarnings("resource")
-                    AbstractCompactedRow row = iter.next();
-                    validator.add(row);
+                    // validate the CF as we iterate over it
+                    validator.prepare(cfs, tree);
+                    while (iter.hasNext())
+                    {
+                        if (ci.isStopRequested())
+                            throw new CompactionInterruptedException(ci.getCompactionInfo());
+                        AbstractCompactedRow row = iter.next();
+                        validator.add(row);
+                    }
+                    validator.complete();
+                }
+                finally
+                {
+                    if (isSnapshotValidation)
+                    {
+                        cfs.clearSnapshot(snapshotName);
+                    }
+
+                    metrics.finishCompaction(ci);
                 }
-                validator.complete();
-            }
-            catch (Exception e)
-            {
-                Throwables.propagate(e);
             }
-            finally
+
+            if (logger.isDebugEnabled())
             {
-                metrics.finishCompaction(ci);
+                // MT serialize may take time
+                long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+                logger.debug("Validation finished in {} msec, depth {} for {} keys, serialized size {} bytes for {}",
+                             duration,
+                             depth,
+                             numPartitions,
+                             MerkleTree.serializer.serializedSize(tree, 0),
+                             validator.desc);
             }
         }
-
-        if (logger.isDebugEnabled())
+        finally
         {
-            // MT serialize may take time
-            long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-            logger.debug("Validation finished in {} msec, depth {} for {} keys, serialized size {} bytes for {}",
-                         duration,
-                         depth,
-                         numPartitions,
-                         MerkleTree.serializer.serializedSize(tree, 0),
-                         validator.desc);
+            if (sstables != null)
+                sstables.release();
         }
     }