You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/11/03 17:03:41 UTC

[5/5] git commit: Merge branch 'cassandra-2.1' into trunk

Merge branch 'cassandra-2.1' into trunk

Conflicts:
	src/java/org/apache/cassandra/db/compaction/CompactionManager.java
	src/java/org/apache/cassandra/db/compaction/Upgrader.java
	src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
	test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0f59629c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0f59629c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0f59629c

Branch: refs/heads/trunk
Commit: 0f59629ce280ba2a74d65a7719dde7cf79923f05
Parents: e60a06c 5160c91
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Nov 3 17:02:10 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Nov 3 17:02:10 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../org/apache/cassandra/db/DataTracker.java    | 109 ++--
 .../db/compaction/CompactionManager.java        |  29 +-
 .../cassandra/db/compaction/CompactionTask.java |   7 +-
 .../cassandra/db/compaction/Scrubber.java       |  12 +-
 .../cassandra/db/compaction/Upgrader.java       |  31 +-
 .../io/sstable/IndexSummaryManager.java         |   2 +-
 .../cassandra/io/sstable/SSTableRewriter.java   | 160 +++---
 .../io/sstable/format/SSTableReader.java        |   6 +
 .../db/compaction/AntiCompactionTest.java       |  42 +-
 .../io/sstable/IndexSummaryManagerTest.java     |   2 +-
 .../cassandra/io/sstable/SSTableReaderTest.java |   2 +-
 .../io/sstable/SSTableRewriterTest.java         | 504 +++++++++++++++++++
 13 files changed, 755 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 3a8ada2,32083cc..9754110
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,37 -1,6 +1,39 @@@
 +3.0
 + * Mark sstables as repaired after full repair (CASSANDRA-7586) 
 + * Extend Descriptor to include a format value and refactor reader/writer apis (CASSANDRA-7443)
 + * Integrate JMH for microbenchmarks (CASSANDRA-8151)
 + * Keep sstable levels when bootstrapping (CASSANDRA-7460)
 + * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838)
 + * Support for aggregation functions (CASSANDRA-4914)
 + * Remove cassandra-cli (CASSANDRA-7920)
 + * Accept dollar quoted strings in CQL (CASSANDRA-7769)
 + * Make assassinate a first class command (CASSANDRA-7935)
 + * Support IN clause on any clustering column (CASSANDRA-4762)
 + * Improve compaction logging (CASSANDRA-7818)
 + * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
 + * Do anticompaction in groups (CASSANDRA-6851)
 + * Support pure user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929,
 +   7924, 7812, 8063)
 + * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
 + * Move sstable RandomAccessReader to nio2, which allows using the
 +   FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
 + * Remove CQL2 (CASSANDRA-5918)
 + * Add Thrift get_multi_slice call (CASSANDRA-6757)
 + * Optimize fetching multiple cells by name (CASSANDRA-6933)
 + * Allow compilation in java 8 (CASSANDRA-7028)
 + * Make incremental repair default (CASSANDRA-7250)
 + * Enable code coverage thru JaCoCo (CASSANDRA-7226)
 + * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369) 
 + * Shorten SSTable path (CASSANDRA-6962)
 + * Use unsafe mutations for most unit tests (CASSANDRA-6969)
 + * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
 + * Fail on very large batch sizes (CASSANDRA-8011)
 + * improve concurrency of repair (CASSANDRA-6455)
 +
 +
  2.1.2
+  * Refactor how we track live size (CASSANDRA-7852)
+  * Make sure unfinished compaction files are removed (CASSANDRA-8124)
   * Fix shutdown when run as Windows service (CASSANDRA-8136)
   * Fix DESCRIBE TABLE with custom indexes (CASSANDRA-8031)
   * Fix race in RecoveryManagerTest (CASSANDRA-8176)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 3ee36cd,84c3cb5..cccb7f9
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1045,76 -987,63 +1046,78 @@@ public class CompactionManager implemen
              if (!new File(sstable.getFilename()).exists())
              {
                  logger.info("Skipping anticompaction for {}, required sstable was compacted and is no longer available.", sstable);
 +                i.remove();
                  continue;
              }
 +            if (groupMaxDataAge < sstable.maxDataAge)
 +                groupMaxDataAge = sstable.maxDataAge;
 +        }
 +
 +     
 +        if (anticompactionGroup.size() == 0)
 +        {
 +            logger.info("No valid anticompactions for this group, All sstables were compacted and are no longer available");
 +            return 0;
 +        }
  
 -            logger.info("Anticompacting {}", sstable);
 -            Set<SSTableReader> sstableAsSet = new HashSet<>();
 -            sstableAsSet.add(sstable);
 +        logger.info("Anticompacting {}", anticompactionGroup);
 +        Set<SSTableReader> sstableAsSet = new HashSet<>(anticompactionGroup);
  
 -            File destination = cfs.directories.getDirectoryForNewSSTables();
 -            SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false);
 -            SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false);
 +        File destination = cfs.directories.getDirectoryForNewSSTables();
-         SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, OperationType.ANTICOMPACTION, false);
-         SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, OperationType.ANTICOMPACTION, false);
++        SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, false);
++        SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, false);
  
 -            AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
 -            try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(new HashSet<>(Collections.singleton(sstable)));
 -                 CompactionController controller = new CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
 -            {
 -                repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable));
 -                unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable));
 +        long repairedKeyCount = 0;
 +        long unrepairedKeyCount = 0;
 +        AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
 +        try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup);
 +             CompactionController controller = new CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
 +        {
 +            int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(anticompactionGroup)));
  
 -                CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller);
 -                Iterator<AbstractCompactedRow> iter = ci.iterator();
 -                while(iter.hasNext())
 +            repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet));
 +            unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet));
 +
 +            CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat());
 +            Iterator<AbstractCompactedRow> iter = ci.iterator();
 +            while(iter.hasNext())
 +            {
 +                AbstractCompactedRow row = iter.next();
 +                // if current range from sstable is repaired, save it into the new repaired sstable
 +                if (Range.isInRanges(row.key.getToken(), ranges))
                  {
 -                    AbstractCompactedRow row = iter.next();
 -                    // if current range from sstable is repaired, save it into the new repaired sstable
 -                    if (Range.isInRanges(row.key.getToken(), ranges))
 -                    {
 -                        repairedSSTableWriter.append(row);
 -                        repairedKeyCount++;
 -                    }
 -                    // otherwise save into the new 'non-repaired' table
 -                    else
 -                    {
 -                        unRepairedSSTableWriter.append(row);
 -                        unrepairedKeyCount++;
 -                    }
 +                    repairedSSTableWriter.append(row);
 +                    repairedKeyCount++;
 +                }
 +                // otherwise save into the new 'non-repaired' table
 +                else
 +                {
 +                    unRepairedSSTableWriter.append(row);
 +                    unrepairedKeyCount++;
                  }
 -                // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
 -                // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness
 -                // add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt
 -                anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt));
 -                anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE));
 -                cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables, OperationType.ANTICOMPACTION);
 -            }
 -            catch (Throwable e)
 -            {
 -                JVMStabilityInspector.inspectThrowable(e);
 -                logger.error("Error anticompacting " + sstable, e);
 -                repairedSSTableWriter.abort();
 -                unRepairedSSTableWriter.abort();
              }
 +            // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
 +            // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness
-             repairedSSTableWriter.finish(false, repairedAt);
-             unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE);
-             // add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt
++            List<SSTableReader> anticompactedSSTables = new ArrayList<>();
++            anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt));
++            anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE));
++            cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables, OperationType.ANTICOMPACTION);
++
 +            logger.debug("Repaired {} keys out of {} for {}/{} in {}", repairedKeyCount,
 +                                                                       repairedKeyCount + unrepairedKeyCount,
 +                                                                       cfs.keyspace.getName(),
 +                                                                       cfs.getColumnFamilyName(),
 +                                                                       anticompactionGroup);
-             return repairedSSTableWriter.finished().size() + unRepairedSSTableWriter.finished().size();
++            return anticompactedSSTables.size();
          }
 -        String format = "Repaired {} keys of {} for {}/{}";
 -        logger.debug(format, repairedKeyCount, (repairedKeyCount + unrepairedKeyCount), cfs.keyspace, cfs.getColumnFamilyName());
 -        String format2 = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s).";
 -        logger.info(format2, repairedSSTables.size(), anticompactedSSTables.size());
 -
 -        return anticompactedSSTables;
 +        catch (Throwable e)
 +        {
 +            JVMStabilityInspector.inspectThrowable(e);
 +            logger.error("Error anticompacting " + anticompactionGroup, e);
 +            repairedSSTableWriter.abort();
 +            unRepairedSSTableWriter.abort();
 +        }
 +        return 0;
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 584ff38,b442482..808626b
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -158,9 -150,9 +158,9 @@@ public class CompactionTask extends Abs
  
              try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
              {
 -                AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller);
 +                AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat);
                  Iterator<AbstractCompactedRow> iter = ci.iterator();
- 
+                 List<SSTableReader> newSStables;
                  // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
                  // replace the old entries.  Track entries to preheat here until then.
                  long minRepairedAt = getMinRepairedAt(actuallyCompact);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/Upgrader.java
index c9e7034,39f668d..52739de
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@@ -21,8 -21,9 +21,9 @@@ import java.io.File
  import java.util.*;
  
  import com.google.common.base.Throwables;
+ import com.google.common.collect.Sets;
  
 +import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.DecoratedKey;
  import org.apache.cassandra.io.sstable.*;
@@@ -69,29 -67,24 +68,24 @@@ public class Upgrade
  
          // Get the max timestamp of the precompacted sstables
          // and adds generation of live ancestors
-         // -- note that we always only have one SSTable in toUpgrade here:
-         for (SSTableReader sstable : toUpgrade)
+         sstableMetadataCollector.addAncestor(sstable.descriptor.generation);
+         for (Integer i : sstable.getAncestors())
          {
-             sstableMetadataCollector.addAncestor(sstable.descriptor.generation);
-             for (Integer i : sstable.getAncestors())
-             {
-                 if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists())
-                     sstableMetadataCollector.addAncestor(i);
-             }
-             sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel());
+             if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists())
+                 sstableMetadataCollector.addAncestor(i);
          }
- 
+         sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel());
 -        return new SSTableWriter(cfs.getTempSSTablePath(directory), estimatedRows, repairedAt, cfs.metadata, cfs.partitioner, sstableMetadataCollector);
 +        return SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(directory)), estimatedRows, repairedAt, cfs.metadata, cfs.partitioner, sstableMetadataCollector);
      }
  
      public void upgrade()
      {
          outputHandler.output("Upgrading " + sstable);
- 
-         SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(this.toUpgrade), OperationType.UPGRADE_SSTABLES, true);
-         try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(this.toUpgrade))
+         Set<SSTableReader> toUpgrade = Sets.newHashSet(sstable);
+         SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(toUpgrade), true);
+         try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(toUpgrade))
          {
 -            Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller).iterator();
 +            Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat()).iterator();
              writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
              while (iter.hasNext())
              {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 18825cb,4d5a06f..f3d08a6
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@@ -34,11 -35,9 +35,11 @@@ import org.apache.cassandra.db.DataTrac
  import org.apache.cassandra.db.DecoratedKey;
  import org.apache.cassandra.db.RowIndexEntry;
  import org.apache.cassandra.db.compaction.AbstractCompactedRow;
- import org.apache.cassandra.db.compaction.OperationType;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableWriter;
  import org.apache.cassandra.utils.CLibrary;
  import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.Pair;
  
  /**
   * Wraps one or more writers as output for rewriting one or more readers: every sstable_preemptive_open_interval_in_mb