You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2016/07/18 15:43:20 UTC

[05/50] cassandra git commit: Avoid missing sstables when getting the canonical sstables

Avoid missing sstables when getting the canonical sstables

Patch by marcuse; reviewed by Stefania Alborghetti for CASSANDRA-11996


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bc23632f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bc23632f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bc23632f

Branch: refs/heads/cassandra-3.8
Commit: bc23632f201f760147d8bd1fbee68533fc3f6dfa
Parents: 5b0566a
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Jun 13 15:29:08 2016 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Jul 6 07:57:24 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |  6 +--
 .../cassandra/db/SizeEstimatesRecorder.java     |  7 +++-
 .../apache/cassandra/db/lifecycle/Tracker.java  |  2 +-
 .../org/apache/cassandra/db/lifecycle/View.java | 30 +++++++-------
 .../apache/cassandra/db/view/ViewBuilder.java   |  4 +-
 .../cassandra/index/SecondaryIndexManager.java  |  2 +-
 .../index/internal/CassandraIndex.java          |  2 +-
 .../io/sstable/IndexSummaryManager.java         |  2 +-
 .../cassandra/streaming/StreamSession.java      |  4 +-
 .../apache/cassandra/db/lifecycle/ViewTest.java |  6 +--
 .../index/internal/CustomCassandraIndex.java    |  2 +-
 .../io/sstable/SSTableRewriterTest.java         | 41 +++++++++++++++++++-
 13 files changed, 78 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 99ac3ad..b3063b4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.9
+ * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
  * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)
  * Fix column ordering of results with static columns for Thrift requests in
    a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 1be3175..b95e88d 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1482,7 +1482,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public Iterable<SSTableReader> getSSTables(SSTableSet sstableSet)
     {
-        return data.getView().sstables(sstableSet);
+        return data.getView().select(sstableSet);
     }
 
     public Iterable<SSTableReader> getUncompactingSSTables()
@@ -1916,7 +1916,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public Iterable<DecoratedKey> keySamples(Range<Token> range)
     {
-        try (RefViewFragment view = selectAndReference(View.select(SSTableSet.CANONICAL)))
+        try (RefViewFragment view = selectAndReference(View.selectFunction(SSTableSet.CANONICAL)))
         {
             Iterable<DecoratedKey>[] samples = new Iterable[view.sstables.size()];
             int i = 0;
@@ -1930,7 +1930,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public long estimatedKeysForRange(Range<Token> range)
     {
-        try (RefViewFragment view = selectAndReference(View.select(SSTableSet.CANONICAL)))
+        try (RefViewFragment view = selectAndReference(View.selectFunction(SSTableSet.CANONICAL)))
         {
             long count = 0;
             for (SSTableReader sstable : view.sstables)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
index 3461aef..0b31b87 100644
--- a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
+++ b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.lifecycle.View;
 import org.apache.cassandra.dht.Range;
@@ -103,8 +104,10 @@ public class SizeEstimatesRecorder extends MigrationListener implements Runnable
             {
                 while (refs == null)
                 {
-                    // note that this is not guaranteed to return all sstables within the ranges, but for an estimated size, that should be fine
-                    Iterable<SSTableReader> canonicalSSTables = table.getTracker().getView().select(SSTableSet.CANONICAL, table.select(View.selectLive(Range.makeRowRange(range))).sstables);
+                    Iterable<SSTableReader> sstables = table.getTracker().getView().select(SSTableSet.CANONICAL);
+                    SSTableIntervalTree tree = SSTableIntervalTree.build(sstables);
+                    Range<PartitionPosition> r = Range.makeRowRange(range);
+                    Iterable<SSTableReader> canonicalSSTables = View.sstablesInBounds(r.left, r.right, tree);
                     refs = Refs.tryRef(canonicalSSTables);
                 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index 16090a1..c94b88f 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -401,7 +401,7 @@ public class Tracker
 
     public Iterable<SSTableReader> getUncompacting()
     {
-        return view.get().sstables(SSTableSet.NONCOMPACTING);
+        return view.get().select(SSTableSet.NONCOMPACTING);
     }
 
     public Iterable<SSTableReader> getUncompacting(Iterable<SSTableReader> candidates)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java b/src/java/org/apache/cassandra/db/lifecycle/View.java
index 96aaa49..3fa197f 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@ -118,14 +118,9 @@ public class View
         return sstables;
     }
 
-    public Iterable<SSTableReader> sstables(SSTableSet sstableSet)
-    {
-        return select(sstableSet, sstables);
-    }
-
     public Iterable<SSTableReader> sstables(SSTableSet sstableSet, Predicate<SSTableReader> filter)
     {
-        return select(sstableSet, filter(sstables, filter));
+        return filter(select(sstableSet), filter);
     }
 
     // any sstable known by this tracker in any form; we have a special method here since it's only used for testing/debug
@@ -136,7 +131,7 @@ public class View
         return Iterables.concat(sstables, filterOut(compacting, sstables));
     }
 
-    public Iterable<SSTableReader> select(SSTableSet sstableSet, Iterable<SSTableReader> sstables)
+    public Iterable<SSTableReader> select(SSTableSet sstableSet)
     {
         switch (sstableSet)
         {
@@ -145,9 +140,18 @@ public class View
             case NONCOMPACTING:
                 return filter(sstables, (s) -> !compacting.contains(s));
             case CANONICAL:
-                return transform(filter(sstables,
-                                        (s) -> s.openReason != SSTableReader.OpenReason.EARLY),
-                                 (s) -> s.openReason != SSTableReader.OpenReason.MOVED_START ? s : compactingMap.get(s));
+                Set<SSTableReader> canonicalSSTables = new HashSet<>();
+                for (SSTableReader sstable : compacting)
+                    if (sstable.openReason != SSTableReader.OpenReason.EARLY)
+                        canonicalSSTables.add(sstable);
+                // reason for checking if compacting contains the sstable is that if compacting has an EARLY version
+                // of a NORMAL sstable, we still have the canonical version of that sstable in sstables.
+                // note that the EARLY version is equal, but not == since it is a different instance of the same sstable.
+                for (SSTableReader sstable : sstables)
+                    if (!compacting.contains(sstable) && sstable.openReason != SSTableReader.OpenReason.EARLY)
+                        canonicalSSTables.add(sstable);
+
+                return canonicalSSTables;
             default:
                 throw new IllegalStateException();
         }
@@ -190,7 +194,7 @@ public class View
             return Collections.emptyList();
 
         PartitionPosition stopInTree = right.isMinimum() ? intervalTree.max() : right;
-        return select(SSTableSet.LIVE, intervalTree.search(Interval.create(left, stopInTree)));
+        return intervalTree.search(Interval.create(left, stopInTree));
     }
 
     public static List<SSTableReader> sstablesInBounds(PartitionPosition left, PartitionPosition right, SSTableIntervalTree intervalTree)
@@ -204,9 +208,9 @@ public class View
         return intervalTree.search(Interval.create(left, stopInTree));
     }
 
-    public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet)
+    public static Function<View, Iterable<SSTableReader>> selectFunction(SSTableSet sstableSet)
     {
-        return (view) -> view.sstables(sstableSet);
+        return (view) -> view.select(sstableSet);
     }
 
     public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet, Predicate<SSTableReader> filter)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index b2b409b..b55eda0 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -108,7 +108,7 @@ public class ViewBuilder extends CompactionInfo.Holder
         if (buildStatus == null)
         {
             baseCfs.forceBlockingFlush();
-            function = org.apache.cassandra.db.lifecycle.View.select(SSTableSet.CANONICAL);
+            function = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL);
             int generation = Integer.MIN_VALUE;
 
             try (Refs<SSTableReader> temp = baseCfs.selectAndReference(function).refs)
@@ -129,7 +129,7 @@ public class ViewBuilder extends CompactionInfo.Holder
                 @Nullable
                 public Iterable<SSTableReader> apply(org.apache.cassandra.db.lifecycle.View view)
                 {
-                    Iterable<SSTableReader> readers = org.apache.cassandra.db.lifecycle.View.select(SSTableSet.CANONICAL).apply(view);
+                    Iterable<SSTableReader> readers = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL).apply(view);
                     if (readers != null)
                         return Iterables.filter(readers, ssTableReader -> ssTableReader.descriptor.generation <= buildStatus.left);
                     return null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index 9635c59..6dfdeee 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -271,7 +271,7 @@ public class SecondaryIndexManager implements IndexRegistry
     {
         if (index.shouldBuildBlocking())
         {
-            try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL));
+            try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL));
                  Refs<SSTableReader> sstables = viewFragment.refs)
             {
                 buildIndexesBlocking(sstables, Collections.singleton(index));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
index 9d997a7..2a0dec0 100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@ -699,7 +699,7 @@ public abstract class CassandraIndex implements Index
     {
         baseCfs.forceBlockingFlush();
 
-        try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL));
+        try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL));
              Refs<SSTableReader> sstables = viewFragment.refs)
         {
             if (sstables.isEmpty())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index aed35c9..ddda430 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -208,7 +208,7 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
                 do
                 {
                     View view = cfStore.getTracker().getView();
-                    allSSTables = ImmutableSet.copyOf(view.sstables(SSTableSet.CANONICAL));
+                    allSSTables = ImmutableSet.copyOf(view.select(SSTableSet.CANONICAL));
                     nonCompacting = ImmutableSet.copyOf(view.getUncompacting(allSSTables));
                 }
                 while (null == (txn = cfStore.getTracker().tryModify(nonCompacting, OperationType.UNKNOWN)));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index a14f815..bfbedc7 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -329,7 +329,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
                     keyRanges.add(Range.makeRowRange(range));
                 refs.addAll(cfStore.selectAndReference(view -> {
                     Set<SSTableReader> sstables = Sets.newHashSet();
-                    SSTableIntervalTree intervalTree = SSTableIntervalTree.build(view.sstables(SSTableSet.CANONICAL));
+                    SSTableIntervalTree intervalTree = SSTableIntervalTree.build(view.select(SSTableSet.CANONICAL));
                     for (Range<PartitionPosition> keyRange : keyRanges)
                     {
                         // keyRange excludes its start, while sstableInBounds is inclusive (of both start and end).
@@ -346,7 +346,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
                     }
 
                     if (logger.isDebugEnabled())
-                        logger.debug("ViewFilter for {}/{} sstables", sstables.size(), Iterables.size(view.sstables(SSTableSet.CANONICAL)));
+                        logger.debug("ViewFilter for {}/{} sstables", sstables.size(), Iterables.size(view.select(SSTableSet.CANONICAL)));
                     return sstables;
                 }).refs);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
index 98f9300..a5dceca 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
@@ -100,8 +100,8 @@ public class ViewTest
         Assert.assertFalse(View.permitCompacting(readers.subList(1, 2)).apply(cur));
         Assert.assertTrue(readers.subList(2, 5).containsAll(copyOf(cur.getUncompacting(readers))));
         Assert.assertEquals(3, copyOf(cur.getUncompacting(readers)).size());
-        Assert.assertTrue(ImmutableSet.copyOf(cur.sstables(SSTableSet.NONCOMPACTING)).containsAll(readers.subList(2, 5)));
-        Assert.assertEquals(3, ImmutableSet.copyOf(cur.sstables(SSTableSet.NONCOMPACTING)).size());
+        Assert.assertTrue(ImmutableSet.copyOf(cur.select(SSTableSet.NONCOMPACTING)).containsAll(readers.subList(2, 5)));
+        Assert.assertEquals(3, ImmutableSet.copyOf(cur.select(SSTableSet.NONCOMPACTING)).size());
 
         // check marking already compacting readers fails with an exception
         testFailure(View.updateCompacting(emptySet(), readers.subList(0, 1)), cur);
@@ -129,7 +129,7 @@ public class ViewTest
         testFailure(View.updateCompacting(copyOf(readers.subList(0, 2)), emptySet()), cur);
         Assert.assertTrue(copyOf(concat(readers.subList(0, 1), readers.subList(2, 5))).containsAll(copyOf(cur.getUncompacting(readers))));
         Assert.assertEquals(4, copyOf(cur.getUncompacting(readers)).size());
-        Set<SSTableReader> nonCompacting = ImmutableSet.copyOf(cur.sstables(SSTableSet.NONCOMPACTING));
+        Set<SSTableReader> nonCompacting = ImmutableSet.copyOf(cur.select(SSTableSet.NONCOMPACTING));
         Assert.assertTrue(nonCompacting.containsAll(readers.subList(2, 5)));
         Assert.assertTrue(nonCompacting.containsAll(readers.subList(0, 1)));
         Assert.assertEquals(4, nonCompacting.size());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
index 6aaefb7..2124abe 100644
--- a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
+++ b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
@@ -623,7 +623,7 @@ public class CustomCassandraIndex implements Index
     {
         baseCfs.forceBlockingFlush();
 
-        try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL));
+        try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL));
              Refs<SSTableReader> sstables = viewFragment.refs)
         {
             if (sstables.isEmpty())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 18bc760..c842b7f 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -759,7 +759,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
                 if (!checked && writer.currentWriter().getFilePointer() > 15000000)
                 {
                     checked = true;
-                    ColumnFamilyStore.ViewFragment viewFragment = cfs.select(View.select(SSTableSet.CANONICAL));
+                    ColumnFamilyStore.ViewFragment viewFragment = cfs.select(View.selectFunction(SSTableSet.CANONICAL));
                     // canonical view should have only one SSTable which is not opened early.
                     assertEquals(1, viewFragment.sstables.size());
                     SSTableReader sstable = viewFragment.sstables.get(0);
@@ -872,6 +872,45 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
         validateCFS(cfs);
     }
 
+    @Test
+    public void testCanonicalSSTables() throws ExecutionException, InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        truncate(cfs);
+
+        cfs.addSSTable(writeFile(cfs, 100));
+        Collection<SSTableReader> allSSTables = cfs.getSSTables();
+        assertEquals(1, allSSTables.size());
+        final AtomicBoolean done = new AtomicBoolean(false);
+        final AtomicBoolean failed = new AtomicBoolean(false);
+        Runnable r = () -> {
+            while (!done.get())
+            {
+                Iterable<SSTableReader> sstables = cfs.getSSTables(SSTableSet.CANONICAL);
+                if (Iterables.size(sstables) != 1)
+                {
+                    failed.set(true);
+                    return;
+                }
+            }
+        };
+        Thread t = new Thread(r);
+        try
+        {
+            t.start();
+            cfs.forceMajorCompaction();
+        }
+        finally
+        {
+            done.set(true);
+            t.join(20);
+        }
+        assertFalse(failed.get());
+
+
+    }
+
     private void validateKeys(Keyspace ks)
     {
         for (int i = 0; i < 100; i++)