You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/02/11 16:31:18 UTC

[2/4] cassandra git commit: Avoid overlap with early compaction replacement

Avoid overlap with early compaction replacement

patch by benedict; reviewed by marcus for CASSANDRA-8683


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/857ee0ac
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/857ee0ac
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/857ee0ac

Branch: refs/heads/trunk
Commit: 857ee0ac3765e12140a0e072cee3fd32b1ad1b3d
Parents: 61384c5
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Feb 11 15:29:46 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Feb 11 15:29:46 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/db/DataTracker.java    | 67 +++++++++++++------
 .../cassandra/io/sstable/SSTableReader.java     | 69 ++++++++++++--------
 .../cassandra/io/sstable/SSTableRewriter.java   | 61 ++++++++++++-----
 .../io/sstable/SSTableRewriterTest.java         |  9 +--
 5 files changed, 138 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/857ee0ac/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b323f18..7352068 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.4
+ * Avoid overlap with early compaction replacement (CASSANDRA-8683)
  * Safer Resource Management++ (CASSANDRA-8707)
  * Write partition size estimates into a system table (CASSANDRA-7688)
  * cqlsh: Fix keys() and full() collection indexes in DESCRIBE output

http://git-wip-us.apache.org/repos/asf/cassandra/blob/857ee0ac/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index acf9f92..8224311 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -379,6 +379,7 @@ public class DataTracker
                 ImmutableList.<Memtable>of(),
                 Collections.<SSTableReader>emptySet(),
                 Collections.<SSTableReader>emptySet(),
+                Collections.<SSTableReader>emptySet(),
                 SSTableIntervalTree.empty()));
     }
 
@@ -612,10 +613,14 @@ public class DataTracker
         private final List<Memtable> flushingMemtables;
         public final Set<SSTableReader> compacting;
         public final Set<SSTableReader> sstables;
+
+        // all sstables that are still in the live set, but have been completely shadowed by a replacement sstable
+        public final Set<SSTableReader> shadowed;
         public final SSTableIntervalTree intervalTree;
 
-        View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Set<SSTableReader> sstables, Set<SSTableReader> compacting, SSTableIntervalTree intervalTree)
+        View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Set<SSTableReader> sstables, Set<SSTableReader> compacting, Set<SSTableReader> shadowed, SSTableIntervalTree intervalTree)
         {
+            this.shadowed = shadowed;
             assert liveMemtables != null;
             assert flushingMemtables != null;
             assert sstables != null;
@@ -664,7 +669,7 @@ public class DataTracker
         View switchMemtable(Memtable newMemtable)
         {
             List<Memtable> newLiveMemtables = ImmutableList.<Memtable>builder().addAll(liveMemtables).add(newMemtable).build();
-            return new View(newLiveMemtables, flushingMemtables, sstables, compacting, intervalTree);
+            return new View(newLiveMemtables, flushingMemtables, sstables, compacting, shadowed, intervalTree);
         }
 
         View markFlushing(Memtable toFlushMemtable)
@@ -691,7 +696,7 @@ public class DataTracker
                                                       .addAll(flushing.subList(i, flushing.size()))
                                                       .build();
 
-            return new View(newLive, newFlushing, sstables, compacting, intervalTree);
+            return new View(newLive, newFlushing, sstables, compacting, shadowed, intervalTree);
         }
 
         View replaceFlushed(Memtable flushedMemtable, SSTableReader newSSTable)
@@ -701,37 +706,61 @@ public class DataTracker
                                                              .addAll(flushingMemtables.subList(0, index))
                                                              .addAll(flushingMemtables.subList(index + 1, flushingMemtables.size()))
                                                              .build();
-            Set<SSTableReader> newSSTables = newSSTable == null
-                                             ? sstables
-                                             : newSSTables(newSSTable);
-            SSTableIntervalTree intervalTree = buildIntervalTree(newSSTables);
-            return new View(liveMemtables, newQueuedMemtables, newSSTables, compacting, intervalTree);
+            Set<SSTableReader> newSSTables = sstables;
+            SSTableIntervalTree intervalTree = this.intervalTree;
+            if (newSSTable != null)
+            {
+                assert !sstables.contains(newSSTable);
+                assert !shadowed.contains(newSSTable);
+                newSSTables = ImmutableSet.<SSTableReader>builder().addAll(sstables).add(newSSTable).build();
+                intervalTree = buildIntervalTree(newSSTables);
+            }
+            return new View(liveMemtables, newQueuedMemtables, newSSTables, compacting, shadowed, intervalTree);
         }
 
         View replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
         {
-            Set<SSTableReader> newSSTables = newSSTables(oldSSTables, replacements);
+            ImmutableSet<SSTableReader> oldSet = ImmutableSet.copyOf(oldSSTables);
+            int newSSTablesSize = shadowed.size() + sstables.size() - oldSSTables.size() + Iterables.size(replacements);
+            assert newSSTablesSize >= Iterables.size(replacements) : String.format("Incoherent new size %d replacing %s by %s in %s", newSSTablesSize, oldSSTables, replacements, this);
+            Set<SSTableReader> newSSTables = new HashSet<>(newSSTablesSize);
+            Set<SSTableReader> newShadowed = new HashSet<>(shadowed.size());
+
+            for (SSTableReader sstable : sstables)
+                if (!oldSet.contains(sstable))
+                    newSSTables.add(sstable);
+
+            for (SSTableReader sstable : shadowed)
+                if (!oldSet.contains(sstable))
+                    newShadowed.add(sstable);
+
+            for (SSTableReader replacement : replacements)
+            {
+                if (replacement.openReason == SSTableReader.OpenReason.SHADOWED)
+                    newShadowed.add(replacement);
+                else
+                    newSSTables.add(replacement);
+            }
+
+            assert newSSTables.size() + newShadowed.size() == newSSTablesSize :
+                String.format("Expecting new size of %d, got %d while replacing %s by %s in %s",
+                          newSSTablesSize, newSSTables.size() + newShadowed.size(), oldSSTables, replacements, this);
+            newSSTables = ImmutableSet.copyOf(newSSTables);
+            newShadowed = ImmutableSet.copyOf(newShadowed);
             SSTableIntervalTree intervalTree = buildIntervalTree(newSSTables);
-            return new View(liveMemtables, flushingMemtables, newSSTables, compacting, intervalTree);
+            return new View(liveMemtables, flushingMemtables, newSSTables, compacting, newShadowed, intervalTree);
         }
 
         View markCompacting(Collection<SSTableReader> tomark)
         {
             Set<SSTableReader> compactingNew = ImmutableSet.<SSTableReader>builder().addAll(compacting).addAll(tomark).build();
-            return new View(liveMemtables, flushingMemtables, sstables, compactingNew, intervalTree);
+            return new View(liveMemtables, flushingMemtables, sstables, compactingNew, shadowed, intervalTree);
         }
 
         View unmarkCompacting(Iterable<SSTableReader> tounmark)
         {
             Set<SSTableReader> compactingNew = ImmutableSet.copyOf(Sets.difference(compacting, ImmutableSet.copyOf(tounmark)));
-            return new View(liveMemtables, flushingMemtables, sstables, compactingNew, intervalTree);
-        }
-
-        private Set<SSTableReader> newSSTables(SSTableReader newSSTable)
-        {
-            assert newSSTable != null;
-            // not performance-sensitive, don't obsess over doing a selection merge here
-            return newSSTables(Collections.<SSTableReader>emptyList(), Collections.singletonList(newSSTable));
+            return new View(liveMemtables, flushingMemtables, sstables, compactingNew, shadowed, intervalTree);
         }
 
         private Set<SSTableReader> newSSTables(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/857ee0ac/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index a28eb44..a588bff 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -215,7 +215,8 @@ public class SSTableReader extends SSTable implements RefCounted<SSTableReader>
         NORMAL,
         EARLY,
         METADATA_CHANGE,
-        MOVED_START
+        MOVED_START,
+        SHADOWED // => MOVED_START past end
     }
 
     public final OpenReason openReason;
@@ -884,42 +885,54 @@ public class SSTableReader extends SSTable implements RefCounted<SSTableReader>
         synchronized (tidy.global)
         {
             assert openReason != OpenReason.EARLY;
-
-            if (newStart.compareTo(this.first) > 0)
+            SSTableReader replacement = new SSTableReader(descriptor, components, metadata, partitioner, ifile.sharedCopy(),
+                                                          dfile.sharedCopy(), indexSummary.sharedCopy(), bf.sharedCopy(),
+                                                          maxDataAge, sstableMetadata, OpenReason.MOVED_START);
+            // TODO: make data/index start accurate for compressed files
+            // TODO: merge with caller's firstKeyBeyond() work,to save time
+            if (newStart.compareTo(first) > 0)
             {
-                if (newStart.compareTo(this.last) > 0)
+                final long dataStart = getPosition(newStart, Operator.EQ).position;
+                final long indexStart = getIndexScanPosition(newStart);
+                this.tidy.runOnClose = new Runnable()
                 {
-                    this.tidy.runOnClose = new Runnable()
+                    public void run()
                     {
-                        public void run()
-                        {
-                            CLibrary.trySkipCache(dfile.path, 0, 0);
-                            CLibrary.trySkipCache(ifile.path, 0, 0);
+                        CLibrary.trySkipCache(dfile.path, 0, dataStart);
+                        CLibrary.trySkipCache(ifile.path, 0, indexStart);
+                        if (runOnClose != null)
                             runOnClose.run();
-                        }
-                    };
-                }
-                else
+                    }
+                };
+            }
+
+            replacement.first = newStart;
+            replacement.last = this.last;
+            setReplacedBy(replacement);
+            return replacement;
+        }
+    }
+
+    public SSTableReader cloneAsShadowed(final Runnable runOnClose)
+    {
+        synchronized (tidy.global)
+        {
+            assert openReason != OpenReason.EARLY;
+            this.tidy.runOnClose = new Runnable()
+            {
+                public void run()
                 {
-                    final long dataStart = getPosition(newStart, Operator.GE).position;
-                    final long indexStart = getIndexScanPosition(newStart);
-                    this.tidy.runOnClose = new Runnable()
-                    {
-                        public void run()
-                        {
-                            CLibrary.trySkipCache(dfile.path, 0, dataStart);
-                            CLibrary.trySkipCache(ifile.path, 0, indexStart);
-                            runOnClose.run();
-                        }
-                    };
+                    CLibrary.trySkipCache(dfile.path, 0, 0);
+                    CLibrary.trySkipCache(ifile.path, 0, 0);
+                    runOnClose.run();
                 }
-            }
+            };
 
             SSTableReader replacement = new SSTableReader(descriptor, components, metadata, partitioner, ifile.sharedCopy(),
                                                           dfile.sharedCopy(), indexSummary.sharedCopy(), bf.sharedCopy(),
-                                                          maxDataAge, sstableMetadata, OpenReason.MOVED_START);
-            replacement.first = this.last.compareTo(newStart) > 0 ? newStart : this.last;
-            replacement.last = this.last;
+                                                          maxDataAge, sstableMetadata, OpenReason.SHADOWED);
+            replacement.first = first;
+            replacement.last = last;
             setReplacedBy(replacement);
             return replacement;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/857ee0ac/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 6356d4d..e6e4343 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -20,8 +20,8 @@ package org.apache.cassandra.io.sstable;
 import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Functions;
+import com.google.common.collect.ImmutableList;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -168,7 +168,7 @@ public class SSTableRewriter
                     replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
                     currentlyOpenedEarly = reader;
                     currentlyOpenedEarlyAt = writer.getFilePointer();
-                    moveStarts(reader, Functions.constant(reader.last), false);
+                    moveStarts(reader, reader.last, false);
                 }
             }
         }
@@ -177,7 +177,7 @@ public class SSTableRewriter
     public void abort()
     {
         switchWriter(null, true);
-        moveStarts(null, Functions.forMap(originalStarts), true);
+        moveStarts(null, null, true);
 
         // remove already completed SSTables
         for (SSTableReader sstable : finished)
@@ -213,10 +213,10 @@ public class SSTableRewriter
      * instance, we would get exceptions.
      *
      * @param newReader the rewritten reader that replaces them for this region
-     * @param newStarts a function mapping a reader's descriptor to their new start value
+     * @param lowerbound if !reset, must be non-null, and marks the exclusive lowerbound of the start for each sstable
      * @param reset true iff we are restoring earlier starts (increasing the range over which they are valid)
      */
-    private void moveStarts(SSTableReader newReader, Function<? super Descriptor, DecoratedKey> newStarts, boolean reset)
+    private void moveStarts(SSTableReader newReader, DecoratedKey lowerbound, boolean reset)
     {
         if (isOffline)
             return;
@@ -229,31 +229,56 @@ public class SSTableRewriter
             for (Map.Entry<DecoratedKey, RowIndexEntry> cacheKey : cachedKeys.entrySet())
                 newReader.cacheKey(cacheKey.getKey(), cacheKey.getValue());
         }
+
         cachedKeys = new HashMap<>();
-        for (final SSTableReader sstable : rewriting)
+        for (SSTableReader sstable : ImmutableList.copyOf(rewriting))
         {
-            DecoratedKey newStart = newStarts.apply(sstable.descriptor);
-            assert newStart != null;
-            if (sstable.first.compareTo(newStart) < 0 || (reset && newStart != sstable.first))
+            // we call getCurrentReplacement() to support multiple rewriters operating over the same source readers at once.
+            // note: only one such writer should be written to at any moment
+            final SSTableReader latest = sstable.getCurrentReplacement();
+            SSTableReader replacement;
+            if (reset)
+            {
+                DecoratedKey newStart = originalStarts.get(sstable.descriptor);
+                replacement = latest.cloneWithNewStart(newStart, null);
+            }
+            else
             {
-                toReplace.add(sstable);
-                // we call getCurrentReplacement() to support multiple rewriters operating over the same source readers at once.
-                // note: only one such writer should be written to at any moment
-                replaceWith.add(sstable.getCurrentReplacement().cloneWithNewStart(newStart, new Runnable()
+                // skip any sstables that we know to already be shadowed
+                if (latest.openReason == SSTableReader.OpenReason.SHADOWED)
+                    continue;
+                if (latest.first.compareTo(lowerbound) > 0)
+                    continue;
+
+                final Runnable runOnClose = new Runnable()
                 {
                     public void run()
                     {
                         // this is somewhat racey, in that we could theoretically be closing this old reader
                         // when an even older reader is still in use, but it's not likely to have any major impact
                         for (DecoratedKey key : invalidateKeys)
-                            sstable.invalidateCacheKey(key);
+                            latest.invalidateCacheKey(key);
                     }
-                }));
+                };
+
+                if (lowerbound.compareTo(latest.last) >= 0)
+                {
+                    replacement = latest.cloneAsShadowed(runOnClose);
+                }
+                else
+                {
+                    DecoratedKey newStart = latest.firstKeyBeyond(lowerbound);
+                    assert newStart != null;
+                    replacement = latest.cloneWithNewStart(newStart, runOnClose);
+                }
             }
+
+            toReplace.add(latest);
+            replaceWith.add(replacement);
+            rewriting.remove(sstable);
+            rewriting.add(replacement);
         }
         cfs.getDataTracker().replaceWithNewInstances(toReplace, replaceWith);
-        rewriting.removeAll(toReplace);
-        rewriting.addAll(replaceWith);
     }
 
     private void replaceEarlyOpenedFile(SSTableReader toReplace, SSTableReader replaceWith)
@@ -292,7 +317,7 @@ public class SSTableRewriter
         {
             SSTableReader reader = writer.finish(SSTableWriter.FinishType.EARLY, maxAge, -1);
             replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
-            moveStarts(reader, Functions.constant(reader.last), false);
+            moveStarts(reader, reader.last, false);
             finishedEarly.add(new Finished(writer, reader));
         }
         else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/857ee0ac/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 2e11624..4957e5a 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -147,15 +147,14 @@ public class SSTableRewriterTest extends SchemaLoader
                         if (sstable.openReason == SSTableReader.OpenReason.EARLY)
                         {
                             SSTableReader c = sstables.iterator().next();
-                            long lastKeySize = sstable.getPosition(sstable.last, SSTableReader.Operator.GT).position - sstable.getPosition(sstable.last, SSTableReader.Operator.EQ).position;
                             Collection<Range<Token>> r = Arrays.asList(new Range<>(cfs.partitioner.getMinimumToken(), cfs.partitioner.getMinimumToken()));
                             List<Pair<Long, Long>> tmplinkPositions = sstable.getPositionsForRanges(r);
                             List<Pair<Long, Long>> compactingPositions = c.getPositionsForRanges(r);
                             assertEquals(1, tmplinkPositions.size());
                             assertEquals(1, compactingPositions.size());
                             assertEquals(0, tmplinkPositions.get(0).left.longValue());
-                            // make sure we have one key overlap between the early opened file and the compacting one:
-                            assertEquals(tmplinkPositions.get(0).right.longValue(), compactingPositions.get(0).left + lastKeySize);
+                            // make sure we have no overlap between the early opened file and the compacting one:
+                            assertEquals(tmplinkPositions.get(0).right.longValue(), compactingPositions.get(0).left.longValue());
                             assertEquals(c.uncompressedLength(), compactingPositions.get(0).right.longValue());
                         }
                     }
@@ -288,9 +287,11 @@ public class SSTableRewriterTest extends SchemaLoader
         }
         List<SSTableReader> sstables = rewriter.finish();
         assertEquals(files, sstables.size());
-        assertEquals(files + 1, cfs.getSSTables().size());
+        assertEquals(files, cfs.getSSTables().size());
+        assertEquals(1, cfs.getDataTracker().getView().shadowed.size());
         cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
         assertEquals(files, cfs.getSSTables().size());
+        assertEquals(0, cfs.getDataTracker().getView().shadowed.size());
         Thread.sleep(1000);
         assertFileCounts(s.descriptor.directory.list(), 0, 0);
         validateCFS(cfs);