You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/08/03 14:54:15 UTC

[2/3] cassandra git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
	CHANGES.txt
	src/java/org/apache/cassandra/db/compaction/CompactionManager.java
	src/java/org/apache/cassandra/db/compaction/Scrubber.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5c58af97
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5c58af97
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5c58af97

Branch: refs/heads/cassandra-2.2
Commit: 5c58af97a742e9d53b8ce4fe930d51b085259dc2
Parents: 8f9ca07 45bd07f
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Aug 3 14:48:52 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Aug 3 14:48:52 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                                    | 1 +
 .../org/apache/cassandra/db/compaction/CompactionManager.java  | 6 +++---
 src/java/org/apache/cassandra/db/compaction/Scrubber.java      | 4 ++--
 3 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c58af97/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index f4e1832,a8cf796..de7cfa8
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,18 -1,5 +1,19 @@@
 -2.0.17
 +2.1.9
 + * Cannot replace token does not exist - DN node removed as Fat Client (CASSANDRA-9871)
 + * Fix handling of enable/disable autocompaction (CASSANDRA-9899)
 + * Commit log segment recycling is disabled by default (CASSANDRA-9896)
 + * Add consistency level to tracing ouput (CASSANDRA-9827)
 + * Fix MarshalException when upgrading superColumn family (CASSANDRA-9582)
 + * Fix broken logging for "empty" flushes in Memtable (CASSANDRA-9837)
 + * Handle corrupt files on startup (CASSANDRA-9686)
 + * Fix clientutil jar and tests (CASSANDRA-9760)
 + * (cqlsh) Allow the SSL protocol version to be specified through the
 +   config file or environment variables (CASSANDRA-9544)
 + * Remove repair snapshot leftover on startup (CASSANDRA-7357)
 + * Use random nodes for batch log when only 2 racks (CASSANDRA-8735)
 + * Ensure atomicity inside thrift and stream session (CASSANDRA-7757)
 +Merged from 2.0:
+  * Don't cast expected bf size to an int (CASSANDRA-9959)
   * Log when messages are dropped due to cross_node_timeout (CASSANDRA-9793)
   * checkForEndpointCollision fails for legitimate collisions (CASSANDRA-9765)
   * Complete CASSANDRA-8448 fix (CASSANDRA-9519)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c58af97/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 1e4c54a,9d71dc7..3cfbe43
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -729,44 -531,90 +729,44 @@@ public class CompactionManager implemen
              return;
          }
  
 -        boolean hasIndexes = cfs.indexManager.hasIndexes();
 -        CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfs, ranges, renewer);
 +        long start = System.nanoTime();
  
 -        for (SSTableReader sstable : sstables)
 -        {
 -            Set<SSTableReader> sstableAsSet = Collections.singleton(sstable);
 -            if (!hasIndexes && !new Bounds<Token>(sstable.first.token, sstable.last.token).intersects(ranges))
 -            {
 -                cfs.replaceCompactedSSTables(sstableAsSet, Collections.<SSTableReader>emptyList(), OperationType.CLEANUP);
 -                continue;
 -            }
 -            if (!needsCleanup(sstable, ranges))
 -            {
 -                logger.debug("Skipping {} for cleanup; all rows should be kept", sstable);
 -                continue;
 -            }
 +        long totalkeysWritten = 0;
  
-         int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(),
-                                                (int) (SSTableReader.getApproximateKeyCount(sstableSet)));
 -            CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs));
 -            long start = System.nanoTime();
++        long expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(),
++                                                SSTableReader.getApproximateKeyCount(sstableSet));
 +        if (logger.isDebugEnabled())
 +            logger.debug("Expected bloom filter size : {}", expectedBloomFilterSize);
  
 -            long totalkeysWritten = 0;
 +        logger.info("Cleaning up {}", sstable);
  
 -            long expectedBloomFilterSize = Math.max(cfs.metadata.getIndexInterval(),
 -                                                    SSTableReader.getApproximateKeyCount(sstableAsSet, cfs.metadata));
 -            if (logger.isDebugEnabled())
 -                logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
 -
 -            logger.info("Cleaning up " + sstable);
 -            File compactionFileLocation = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.CLEANUP));
 -            if (compactionFileLocation == null)
 -                throw new IOException("disk full");
 -
 -            ICompactionScanner scanner = cleanupStrategy.getScanner(sstable, getRateLimiter());
 -            CleanupInfo ci = new CleanupInfo(sstable, scanner);
 -
 -            metrics.beginCompaction(ci);
 -            SSTableWriter writer = createWriter(cfs,
 -                                                compactionFileLocation,
 -                                                expectedBloomFilterSize,
 -                                                sstable);
 -            SSTableReader newSstable = null;
 -            try
 -            {
 -                while (scanner.hasNext())
 -                {
 -                    if (ci.isStopRequested())
 -                        throw new CompactionInterruptedException(ci.getCompactionInfo());
 -                    SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
 -
 -                    row = cleanupStrategy.cleanup(row);
 -                    if (row == null)
 -                        continue;
 -                    AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
 -                    if (writer.append(compactedRow) != null)
 -                        totalkeysWritten++;
 -                }
 -                if (totalkeysWritten > 0)
 -                    newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
 -                else
 -                    writer.abort();
 -            }
 -            catch (Throwable e)
 -            {
 -                writer.abort();
 -                throw Throwables.propagate(e);
 -            }
 -            finally
 -            {
 -                controller.close();
 -                scanner.close();
 -                metrics.finishCompaction(ci);
 -            }
 +        File compactionFileLocation = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableSet, OperationType.CLEANUP));
 +        if (compactionFileLocation == null)
 +            throw new IOException("disk full");
 +
 +        ISSTableScanner scanner = cleanupStrategy.getScanner(sstable, getRateLimiter());
 +        CleanupInfo ci = new CleanupInfo(sstable, scanner);
 +
 +        metrics.beginCompaction(ci);
 +        Set<SSTableReader> oldSSTable = Sets.newHashSet(sstable);
 +        SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, false);
 +        List<SSTableReader> finished;
 +        try (CompactionController controller = new CompactionController(cfs, sstableSet, getDefaultGcBefore(cfs)))
 +        {
 +            writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable));
  
 -            List<SSTableReader> results = new ArrayList<SSTableReader>(1);
 -            if (newSstable != null)
 +            while (scanner.hasNext())
              {
 -                results.add(newSstable);
 -
 -                String format = "Cleaned up to %s.  %,d to %,d (~%d%% of original) bytes for %,d keys.  Time: %,dms.";
 -                long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
 -                long startsize = sstable.onDiskLength();
 -                long endsize = newSstable.onDiskLength();
 -                double ratio = (double) endsize / (double) startsize;
 -                logger.info(String.format(format, writer.getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
 +                if (ci.isStopRequested())
 +                    throw new CompactionInterruptedException(ci.getCompactionInfo());
 +
 +                SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
 +                row = cleanupStrategy.cleanup(row);
 +                if (row == null)
 +                    continue;
 +                AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(row));
 +                if (writer.append(compactedRow) != null)
 +                    totalkeysWritten++;
              }
  
              // flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd
@@@ -900,8 -734,7 +900,8 @@@
  
      public static SSTableWriter createWriter(ColumnFamilyStore cfs,
                                               File compactionFileLocation,
-                                              int expectedBloomFilterSize,
+                                              long expectedBloomFilterSize,
 +                                             long repairedAt,
                                               SSTableReader sstable)
      {
          FileUtils.createDirectory(compactionFileLocation);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c58af97/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java
index b1c12e0,d242264..400df08
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@@ -109,9 -104,8 +109,9 @@@ public class Scrubber implements Closea
              outputHandler.warn("Missing component: " + sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
          }
  
 -        this.expectedBloomFilterSize = Math.max(cfs.metadata.getIndexInterval(),
 -                hasIndexFile ? SSTableReader.getApproximateKeyCount(toScrub, cfs.metadata) : 0);
 +        this.expectedBloomFilterSize = Math.max(
 +            cfs.metadata.getMinIndexInterval(),
-             hasIndexFile ? (int)(SSTableReader.getApproximateKeyCount(toScrub)) : 0);
++            hasIndexFile ? SSTableReader.getApproximateKeyCount(toScrub) : 0);
  
          // loop through each row, deserializing to check for damage.
          // we'll also loop through the index at the same time, using the position from the index to recover if the