You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/03/04 17:08:01 UTC

[2/3] cassandra git commit: markCompacting only succeeds if the exact SSTableReader instances being marked are in the live set

markCompacting only succeeds if the exact SSTableReader instances being
marked are in the live set

patch by benedict; reviewed by tjake for CASSANDRA-8689


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ec7fba44
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ec7fba44
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ec7fba44

Branch: refs/heads/trunk
Commit: ec7fba44e613815c472e6cd2744058ba324df056
Parents: f3c0e11
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Mar 4 16:06:37 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Mar 4 16:06:37 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |  2 +-
 .../org/apache/cassandra/db/DataTracker.java    | 75 ++++++++++--------
 .../SizeTieredCompactionStrategy.java           |  3 +-
 .../cassandra/io/sstable/SSTableRewriter.java   |  2 +-
 .../unit/org/apache/cassandra/SchemaLoader.java |  4 +-
 .../io/sstable/IndexSummaryManagerTest.java     | 82 +++++++++++++++++++-
 .../io/sstable/SSTableRewriterTest.java         |  7 +-
 8 files changed, 136 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec7fba44/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3b373ae..52f33b3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -27,6 +27,8 @@
  * Show progress of streaming in nodetool netstats (CASSANDRA-8886)
  * IndexSummaryBuilder utilises offheap memory, and shares data between
    each IndexSummary opened from it (CASSANDRA-8757)
+ * markCompacting only succeeds if the exact SSTableReader instances being 
+   marked are in the live set (CASSANDRA-8689)
 Merged from 2.0:
  * Add offline tool to relevel sstables (CASSANDRA-8301)
  * Preserve stream ID for more protocol errors (CASSANDRA-8848)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec7fba44/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 38c5dbe..1f03090 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2616,7 +2616,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             public Iterable<SSTableReader> call() throws Exception
             {
                 assert data.getCompacting().isEmpty() : data.getCompacting();
-                Iterable<SSTableReader> sstables = Lists.newArrayList(AbstractCompactionStrategy.filterSuspectSSTables(getSSTables()));
+                Collection<SSTableReader> sstables = Lists.newArrayList(AbstractCompactionStrategy.filterSuspectSSTables(getSSTables()));
                 if (Iterables.isEmpty(sstables))
                     return Collections.emptyList();
                 boolean success = data.markCompacting(sstables);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec7fba44/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 81964f9..2c2053c 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -23,6 +23,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
 import com.google.common.collect.*;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.slf4j.Logger;
@@ -191,30 +192,32 @@ public class DataTracker
      * unmarkCompacting, but since we will never call markObsolete on a sstable marked
      * as compacting (unless there is a serious bug), we can skip this.
      */
-    public boolean markCompacting(Iterable<SSTableReader> sstables)
+    public boolean markCompacting(Collection<SSTableReader> sstables)
+    {
+        return markCompacting(sstables, false);
+    }
+    public boolean markCompacting(Collection<SSTableReader> sstables, boolean newTables)
     {
         assert sstables != null && !Iterables.isEmpty(sstables);
         while (true)
         {
-            View currentView = view.get();
-            Set<SSTableReader> set = ImmutableSet.copyOf(sstables);
-            Set<SSTableReader> inactive = Sets.difference(set, currentView.compacting);
-            if (inactive.size() < set.size())
+            final View currentView = view.get();
+            if (Iterables.any(sstables, Predicates.in(currentView.compacting)))
                 return false;
 
-            if (Iterables.any(set, new Predicate<SSTableReader>()
+            Predicate live = new Predicate<SSTableReader>()
             {
-                @Override
                 public boolean apply(SSTableReader sstable)
                 {
-                    return sstable.isMarkedCompacted();
+                    return currentView.sstablesMap.get(sstable) == sstable && !sstable.isMarkedCompacted();
                 }
-            }))
-            {
+            };
+            if (newTables)
+                assert !Iterables.any(sstables, Predicates.in(currentView.sstables));
+            else if (!Iterables.all(sstables, live))
                 return false;
-            }
 
-            View newView = currentView.markCompacting(set);
+            View newView = currentView.markCompacting(sstables);
             if (view.compareAndSet(currentView, newView))
                 return true;
         }
@@ -376,12 +379,12 @@ public class DataTracker
     void init()
     {
         view.set(new View(
-                ImmutableList.of(new Memtable(cfstore)),
-                ImmutableList.<Memtable>of(),
-                Collections.<SSTableReader>emptySet(),
-                Collections.<SSTableReader>emptySet(),
-                Collections.<SSTableReader>emptySet(),
-                SSTableIntervalTree.empty()));
+                         ImmutableList.of(new Memtable(cfstore)),
+                         ImmutableList.<Memtable>of(),
+                         Collections.<SSTableReader, SSTableReader>emptyMap(),
+                         Collections.<SSTableReader>emptySet(),
+                         Collections.<SSTableReader>emptySet(),
+                         SSTableIntervalTree.empty()));
     }
 
     /**
@@ -613,12 +616,17 @@ public class DataTracker
         private final List<Memtable> flushingMemtables;
         public final Set<SSTableReader> compacting;
         public final Set<SSTableReader> sstables;
+        // we use a Map here so that we can easily perform identity checks as well as equality checks.
+        // When marking compacting, we now  indicate if we expect the sstables to be present (by default we do),
+        // and we then check that not only are they all present in the live set, but that the exact instance present is
+        // the one we made our decision to compact against.
+        public final Map<SSTableReader, SSTableReader> sstablesMap;
 
         // all sstables that are still in the live set, but have been completely shadowed by a replacement sstable
         public final Set<SSTableReader> shadowed;
         public final SSTableIntervalTree intervalTree;
 
-        View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Set<SSTableReader> sstables, Set<SSTableReader> compacting, Set<SSTableReader> shadowed, SSTableIntervalTree intervalTree)
+        View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader, SSTableReader> sstables, Set<SSTableReader> compacting, Set<SSTableReader> shadowed, SSTableIntervalTree intervalTree)
         {
             this.shadowed = shadowed;
             assert liveMemtables != null;
@@ -629,7 +637,9 @@ public class DataTracker
 
             this.liveMemtables = liveMemtables;
             this.flushingMemtables = flushingMemtables;
-            this.sstables = sstables;
+
+            this.sstablesMap = sstables;
+            this.sstables = sstablesMap.keySet();
             this.compacting = compacting;
             this.intervalTree = intervalTree;
         }
@@ -669,7 +679,7 @@ public class DataTracker
         View switchMemtable(Memtable newMemtable)
         {
             List<Memtable> newLiveMemtables = ImmutableList.<Memtable>builder().addAll(liveMemtables).add(newMemtable).build();
-            return new View(newLiveMemtables, flushingMemtables, sstables, compacting, shadowed, intervalTree);
+            return new View(newLiveMemtables, flushingMemtables, sstablesMap, compacting, shadowed, intervalTree);
         }
 
         View markFlushing(Memtable toFlushMemtable)
@@ -696,7 +706,7 @@ public class DataTracker
                                                       .addAll(flushing.subList(i, flushing.size()))
                                                       .build();
 
-            return new View(newLive, newFlushing, sstables, compacting, shadowed, intervalTree);
+            return new View(newLive, newFlushing, sstablesMap, compacting, shadowed, intervalTree);
         }
 
         View replaceFlushed(Memtable flushedMemtable, SSTableReader newSSTable)
@@ -706,14 +716,15 @@ public class DataTracker
                                                              .addAll(flushingMemtables.subList(0, index))
                                                              .addAll(flushingMemtables.subList(index + 1, flushingMemtables.size()))
                                                              .build();
-            Set<SSTableReader> newSSTables = sstables;
+            Map<SSTableReader, SSTableReader> newSSTables = sstablesMap;
             SSTableIntervalTree intervalTree = this.intervalTree;
             if (newSSTable != null)
             {
                 assert !sstables.contains(newSSTable);
                 assert !shadowed.contains(newSSTable);
-                newSSTables = ImmutableSet.<SSTableReader>builder().addAll(sstables).add(newSSTable).build();
-                intervalTree = buildIntervalTree(newSSTables);
+                newSSTables = ImmutableMap.<SSTableReader, SSTableReader>builder()
+                                          .putAll(sstablesMap).put(newSSTable, newSSTable).build();
+                intervalTree = buildIntervalTree(newSSTables.keySet());
             }
             return new View(liveMemtables, newQueuedMemtables, newSSTables, compacting, shadowed, intervalTree);
         }
@@ -723,12 +734,12 @@ public class DataTracker
             ImmutableSet<SSTableReader> oldSet = ImmutableSet.copyOf(oldSSTables);
             int newSSTablesSize = shadowed.size() + sstables.size() - oldSSTables.size() + Iterables.size(replacements);
             assert newSSTablesSize >= Iterables.size(replacements) : String.format("Incoherent new size %d replacing %s by %s in %s", newSSTablesSize, oldSSTables, replacements, this);
-            Set<SSTableReader> newSSTables = new HashSet<>(newSSTablesSize);
+            Map<SSTableReader, SSTableReader> newSSTables = new HashMap<>(newSSTablesSize);
             Set<SSTableReader> newShadowed = new HashSet<>(shadowed.size());
 
             for (SSTableReader sstable : sstables)
                 if (!oldSet.contains(sstable))
-                    newSSTables.add(sstable);
+                    newSSTables.put(sstable, sstable);
 
             for (SSTableReader sstable : shadowed)
                 if (!oldSet.contains(sstable))
@@ -739,28 +750,28 @@ public class DataTracker
                 if (replacement.openReason == SSTableReader.OpenReason.SHADOWED)
                     newShadowed.add(replacement);
                 else
-                    newSSTables.add(replacement);
+                    newSSTables.put(replacement, replacement);
             }
 
             assert newSSTables.size() + newShadowed.size() == newSSTablesSize :
                 String.format("Expecting new size of %d, got %d while replacing %s by %s in %s",
                           newSSTablesSize, newSSTables.size() + newShadowed.size(), oldSSTables, replacements, this);
-            newSSTables = ImmutableSet.copyOf(newSSTables);
             newShadowed = ImmutableSet.copyOf(newShadowed);
-            SSTableIntervalTree intervalTree = buildIntervalTree(newSSTables);
+            newSSTables = ImmutableMap.copyOf(newSSTables);
+            SSTableIntervalTree intervalTree = buildIntervalTree(newSSTables.keySet());
             return new View(liveMemtables, flushingMemtables, newSSTables, compacting, newShadowed, intervalTree);
         }
 
         View markCompacting(Collection<SSTableReader> tomark)
         {
             Set<SSTableReader> compactingNew = ImmutableSet.<SSTableReader>builder().addAll(compacting).addAll(tomark).build();
-            return new View(liveMemtables, flushingMemtables, sstables, compactingNew, shadowed, intervalTree);
+            return new View(liveMemtables, flushingMemtables, sstablesMap, compactingNew, shadowed, intervalTree);
         }
 
         View unmarkCompacting(Iterable<SSTableReader> tounmark)
         {
             Set<SSTableReader> compactingNew = ImmutableSet.copyOf(Sets.difference(compacting, ImmutableSet.copyOf(tounmark)));
-            return new View(liveMemtables, flushingMemtables, sstables, compactingNew, shadowed, intervalTree);
+            return new View(liveMemtables, flushingMemtables, sstablesMap, compactingNew, shadowed, intervalTree);
         }
 
         private Set<SSTableReader> newSSTables(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec7fba44/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 19abd9c..93484e8 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -21,6 +21,7 @@ import java.util.*;
 import java.util.Map.Entry;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -196,7 +197,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
         Iterable<SSTableReader> filteredSSTables = filterSuspectSSTables(sstables);
         if (Iterables.isEmpty(filteredSSTables))
             return null;
-        if (!cfs.getDataTracker().markCompacting(filteredSSTables))
+        if (!cfs.getDataTracker().markCompacting(ImmutableList.copyOf(filteredSSTables)))
             return null;
         return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, filteredSSTables, gcBefore, false));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec7fba44/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index be1085b..914ce1f 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -351,7 +351,7 @@ public class SSTableRewriter
         }
         else
         {
-            dataTracker.markCompacting(Collections.singleton(replaceWith));
+            dataTracker.markCompacting(Collections.singleton(replaceWith), true);
             toReplaceSet = Collections.emptySet();
         }
         dataTracker.replaceEarlyOpenedFiles(toReplaceSet, Collections.singleton(replaceWith));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec7fba44/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index ce65d5a..db1758f 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -189,7 +189,9 @@ public class SchemaLoader
                                            standardCFMD(ks1, "StandardLowIndexInterval").minIndexInterval(8)
                                                                                         .maxIndexInterval(256)
                                                                                         .caching(CachingOptions.NONE),
-
+                                           standardCFMD(ks1, "StandardRace").minIndexInterval(8)
+                                                                            .maxIndexInterval(256)
+                                                                            .caching(CachingOptions.NONE),
                                            standardCFMD(ks1, "UUIDKeys").keyValidator(UUIDType.instance),
                                            CFMetaData.denseCFMetaData(ks1, "MixedTypes", LongType.instance).keyValidator(UUIDType.instance).defaultValidator(BooleanType.instance),
                                            CFMetaData.denseCFMetaData(ks1, "MixedTypesComposite", composite).keyValidator(composite).defaultValidator(BooleanType.instance),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec7fba44/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index 0bb9d5f..dec7705 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -20,22 +20,27 @@ package org.apache.cassandra.io.sstable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import junit.framework.Assert;
+import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.metrics.RestorableMeter;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.OpOrder;
 
 import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
 import static org.apache.cassandra.io.sstable.IndexSummaryManager.DOWNSAMPLE_THESHOLD;
@@ -46,6 +51,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+@RunWith(OrderedJUnit4ClassRunner.class)
 public class IndexSummaryManagerTest extends SchemaLoader
 {
     private static final Logger logger = LoggerFactory.getLogger(IndexSummaryManagerTest.class);
@@ -83,7 +89,6 @@ public class IndexSummaryManagerTest extends SchemaLoader
         long total = 0;
         for (SSTableReader sstable : sstables)
             total += sstable.getIndexSummaryOffHeapSize();
-
         return total;
     }
 
@@ -494,4 +499,73 @@ public class IndexSummaryManagerTest extends SchemaLoader
                 assertTrue(entry.getValue() >= cfs.metadata.getMinIndexInterval());
         }
     }
+
+    //This test runs last, since cleaning up compactions and tp is a pain
+    @Test
+    public void testCompactionRace() throws InterruptedException, ExecutionException
+    {
+        String ksname = "Keyspace1";
+        String cfname = "StandardRace"; // index interval of 8, no key caching
+        Keyspace keyspace = Keyspace.open(ksname);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+        int numSSTables = 20;
+        int numRows = 28;
+        createSSTables(ksname, cfname, numSSTables, numRows);
+
+        List<SSTableReader> sstables = new ArrayList<>(cfs.getSSTables());
+
+        ExecutorService tp = Executors.newFixedThreadPool(2);
+
+        final AtomicBoolean failed = new AtomicBoolean(false);
+
+        for (int i = 0; i < 2; i++)
+        {
+            tp.submit(new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    while(!failed.get())
+                    {
+                        try
+                        {
+                            IndexSummaryManager.instance.redistributeSummaries();
+                        } catch (Throwable e)
+                        {
+                            failed.set(true);
+                        }
+                    }
+                }
+            });
+        }
+
+        while ( cfs.getSSTables().size() != 1 )
+            cfs.forceMajorCompaction();
+
+        try
+        {
+            Assert.assertFalse(failed.get());
+
+            for (SSTableReader sstable : sstables)
+            {
+                Assert.assertEquals(true, sstable.isMarkedCompacted());
+            }
+
+            Assert.assertEquals(20, sstables.size());
+
+            try
+            {
+                totalOffHeapSize(sstables);
+                Assert.fail("This should have failed");
+            } catch (AssertionError e)
+            {
+
+            }
+        }
+        finally
+        {
+            tp.shutdownNow();
+            CompactionManager.instance.finishCompactionsAndShutdown(10, TimeUnit.SECONDS);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec7fba44/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 258b6b5..6c96905 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -540,7 +540,7 @@ public class SSTableRewriterTest extends SchemaLoader
         cfs.truncateBlocking();
         cfs.disableAutoCompaction();
         SSTableReader s = writeFile(cfs, 1000);
-        cfs.getDataTracker().markCompacting(Arrays.asList(s));
+        cfs.getDataTracker().markCompacting(Arrays.asList(s), true);
         SSTableSplitter splitter = new SSTableSplitter(cfs, s, 10);
         splitter.split();
         Thread.sleep(1000);
@@ -584,6 +584,7 @@ public class SSTableRewriterTest extends SchemaLoader
         if (!offline)
             cfs.addSSTable(s);
         Set<SSTableReader> compacting = Sets.newHashSet(s);
+        cfs.getDataTracker().markCompacting(compacting);
         SSTableRewriter.overrideOpenInterval(10000000);
         SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, offline);
         SSTableWriter w = getWriter(cfs, s.descriptor.directory);
@@ -608,6 +609,10 @@ public class SSTableRewriterTest extends SchemaLoader
                 rewriter.abort();
             }
         }
+        finally
+        {
+            cfs.getDataTracker().unmarkCompacting(compacting);
+        }
         Thread.sleep(1000);
         int filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0);
         assertEquals(filecount, 1);