You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/07/06 06:17:29 UTC
[4/9] cassandra git commit: Avoid missing sstables when getting the
canonical sstables
Avoid missing sstables when getting the canonical sstables
Patch by marcuse; reviewed by Stefania Alborghetti for CASSANDRA-11996
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bc23632f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bc23632f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bc23632f
Branch: refs/heads/cassandra-3.0
Commit: bc23632f201f760147d8bd1fbee68533fc3f6dfa
Parents: 5b0566a
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Jun 13 15:29:08 2016 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Jul 6 07:57:24 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 6 +--
.../cassandra/db/SizeEstimatesRecorder.java | 7 +++-
.../apache/cassandra/db/lifecycle/Tracker.java | 2 +-
.../org/apache/cassandra/db/lifecycle/View.java | 30 +++++++-------
.../apache/cassandra/db/view/ViewBuilder.java | 4 +-
.../cassandra/index/SecondaryIndexManager.java | 2 +-
.../index/internal/CassandraIndex.java | 2 +-
.../io/sstable/IndexSummaryManager.java | 2 +-
.../cassandra/streaming/StreamSession.java | 4 +-
.../apache/cassandra/db/lifecycle/ViewTest.java | 6 +--
.../index/internal/CustomCassandraIndex.java | 2 +-
.../io/sstable/SSTableRewriterTest.java | 41 +++++++++++++++++++-
13 files changed, 78 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 99ac3ad..b3063b4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.9
+ * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
* Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)
* Fix column ordering of results with static columns for Thrift requests in
a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 1be3175..b95e88d 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1482,7 +1482,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public Iterable<SSTableReader> getSSTables(SSTableSet sstableSet)
{
- return data.getView().sstables(sstableSet);
+ return data.getView().select(sstableSet);
}
public Iterable<SSTableReader> getUncompactingSSTables()
@@ -1916,7 +1916,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public Iterable<DecoratedKey> keySamples(Range<Token> range)
{
- try (RefViewFragment view = selectAndReference(View.select(SSTableSet.CANONICAL)))
+ try (RefViewFragment view = selectAndReference(View.selectFunction(SSTableSet.CANONICAL)))
{
Iterable<DecoratedKey>[] samples = new Iterable[view.sstables.size()];
int i = 0;
@@ -1930,7 +1930,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public long estimatedKeysForRange(Range<Token> range)
{
- try (RefViewFragment view = selectAndReference(View.select(SSTableSet.CANONICAL)))
+ try (RefViewFragment view = selectAndReference(View.selectFunction(SSTableSet.CANONICAL)))
{
long count = 0;
for (SSTableReader sstable : view.sstables)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
index 3461aef..0b31b87 100644
--- a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
+++ b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.dht.Range;
@@ -103,8 +104,10 @@ public class SizeEstimatesRecorder extends MigrationListener implements Runnable
{
while (refs == null)
{
- // note that this is not guaranteed to return all sstables within the ranges, but for an estimated size, that should be fine
- Iterable<SSTableReader> canonicalSSTables = table.getTracker().getView().select(SSTableSet.CANONICAL, table.select(View.selectLive(Range.makeRowRange(range))).sstables);
+ Iterable<SSTableReader> sstables = table.getTracker().getView().select(SSTableSet.CANONICAL);
+ SSTableIntervalTree tree = SSTableIntervalTree.build(sstables);
+ Range<PartitionPosition> r = Range.makeRowRange(range);
+ Iterable<SSTableReader> canonicalSSTables = View.sstablesInBounds(r.left, r.right, tree);
refs = Refs.tryRef(canonicalSSTables);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index 16090a1..c94b88f 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -401,7 +401,7 @@ public class Tracker
public Iterable<SSTableReader> getUncompacting()
{
- return view.get().sstables(SSTableSet.NONCOMPACTING);
+ return view.get().select(SSTableSet.NONCOMPACTING);
}
public Iterable<SSTableReader> getUncompacting(Iterable<SSTableReader> candidates)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java b/src/java/org/apache/cassandra/db/lifecycle/View.java
index 96aaa49..3fa197f 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@ -118,14 +118,9 @@ public class View
return sstables;
}
- public Iterable<SSTableReader> sstables(SSTableSet sstableSet)
- {
- return select(sstableSet, sstables);
- }
-
public Iterable<SSTableReader> sstables(SSTableSet sstableSet, Predicate<SSTableReader> filter)
{
- return select(sstableSet, filter(sstables, filter));
+ return filter(select(sstableSet), filter);
}
// any sstable known by this tracker in any form; we have a special method here since it's only used for testing/debug
@@ -136,7 +131,7 @@ public class View
return Iterables.concat(sstables, filterOut(compacting, sstables));
}
- public Iterable<SSTableReader> select(SSTableSet sstableSet, Iterable<SSTableReader> sstables)
+ public Iterable<SSTableReader> select(SSTableSet sstableSet)
{
switch (sstableSet)
{
@@ -145,9 +140,18 @@ public class View
case NONCOMPACTING:
return filter(sstables, (s) -> !compacting.contains(s));
case CANONICAL:
- return transform(filter(sstables,
- (s) -> s.openReason != SSTableReader.OpenReason.EARLY),
- (s) -> s.openReason != SSTableReader.OpenReason.MOVED_START ? s : compactingMap.get(s));
+ Set<SSTableReader> canonicalSSTables = new HashSet<>();
+ for (SSTableReader sstable : compacting)
+ if (sstable.openReason != SSTableReader.OpenReason.EARLY)
+ canonicalSSTables.add(sstable);
+ // reason for checking if compacting contains the sstable is that if compacting has an EARLY version
+ // of a NORMAL sstable, we still have the canonical version of that sstable in sstables.
+ // note that the EARLY version is equal, but not == since it is a different instance of the same sstable.
+ for (SSTableReader sstable : sstables)
+ if (!compacting.contains(sstable) && sstable.openReason != SSTableReader.OpenReason.EARLY)
+ canonicalSSTables.add(sstable);
+
+ return canonicalSSTables;
default:
throw new IllegalStateException();
}
@@ -190,7 +194,7 @@ public class View
return Collections.emptyList();
PartitionPosition stopInTree = right.isMinimum() ? intervalTree.max() : right;
- return select(SSTableSet.LIVE, intervalTree.search(Interval.create(left, stopInTree)));
+ return intervalTree.search(Interval.create(left, stopInTree));
}
public static List<SSTableReader> sstablesInBounds(PartitionPosition left, PartitionPosition right, SSTableIntervalTree intervalTree)
@@ -204,9 +208,9 @@ public class View
return intervalTree.search(Interval.create(left, stopInTree));
}
- public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet)
+ public static Function<View, Iterable<SSTableReader>> selectFunction(SSTableSet sstableSet)
{
- return (view) -> view.sstables(sstableSet);
+ return (view) -> view.select(sstableSet);
}
public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet, Predicate<SSTableReader> filter)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index b2b409b..b55eda0 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -108,7 +108,7 @@ public class ViewBuilder extends CompactionInfo.Holder
if (buildStatus == null)
{
baseCfs.forceBlockingFlush();
- function = org.apache.cassandra.db.lifecycle.View.select(SSTableSet.CANONICAL);
+ function = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL);
int generation = Integer.MIN_VALUE;
try (Refs<SSTableReader> temp = baseCfs.selectAndReference(function).refs)
@@ -129,7 +129,7 @@ public class ViewBuilder extends CompactionInfo.Holder
@Nullable
public Iterable<SSTableReader> apply(org.apache.cassandra.db.lifecycle.View view)
{
- Iterable<SSTableReader> readers = org.apache.cassandra.db.lifecycle.View.select(SSTableSet.CANONICAL).apply(view);
+ Iterable<SSTableReader> readers = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL).apply(view);
if (readers != null)
return Iterables.filter(readers, ssTableReader -> ssTableReader.descriptor.generation <= buildStatus.left);
return null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index 9635c59..6dfdeee 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -271,7 +271,7 @@ public class SecondaryIndexManager implements IndexRegistry
{
if (index.shouldBuildBlocking())
{
- try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL));
+ try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL));
Refs<SSTableReader> sstables = viewFragment.refs)
{
buildIndexesBlocking(sstables, Collections.singleton(index));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
index 9d997a7..2a0dec0 100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@ -699,7 +699,7 @@ public abstract class CassandraIndex implements Index
{
baseCfs.forceBlockingFlush();
- try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL));
+ try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL));
Refs<SSTableReader> sstables = viewFragment.refs)
{
if (sstables.isEmpty())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index aed35c9..ddda430 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -208,7 +208,7 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
do
{
View view = cfStore.getTracker().getView();
- allSSTables = ImmutableSet.copyOf(view.sstables(SSTableSet.CANONICAL));
+ allSSTables = ImmutableSet.copyOf(view.select(SSTableSet.CANONICAL));
nonCompacting = ImmutableSet.copyOf(view.getUncompacting(allSSTables));
}
while (null == (txn = cfStore.getTracker().tryModify(nonCompacting, OperationType.UNKNOWN)));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index a14f815..bfbedc7 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -329,7 +329,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
keyRanges.add(Range.makeRowRange(range));
refs.addAll(cfStore.selectAndReference(view -> {
Set<SSTableReader> sstables = Sets.newHashSet();
- SSTableIntervalTree intervalTree = SSTableIntervalTree.build(view.sstables(SSTableSet.CANONICAL));
+ SSTableIntervalTree intervalTree = SSTableIntervalTree.build(view.select(SSTableSet.CANONICAL));
for (Range<PartitionPosition> keyRange : keyRanges)
{
// keyRange excludes its start, while sstableInBounds is inclusive (of both start and end).
@@ -346,7 +346,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
}
if (logger.isDebugEnabled())
- logger.debug("ViewFilter for {}/{} sstables", sstables.size(), Iterables.size(view.sstables(SSTableSet.CANONICAL)));
+ logger.debug("ViewFilter for {}/{} sstables", sstables.size(), Iterables.size(view.select(SSTableSet.CANONICAL)));
return sstables;
}).refs);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
index 98f9300..a5dceca 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
@@ -100,8 +100,8 @@ public class ViewTest
Assert.assertFalse(View.permitCompacting(readers.subList(1, 2)).apply(cur));
Assert.assertTrue(readers.subList(2, 5).containsAll(copyOf(cur.getUncompacting(readers))));
Assert.assertEquals(3, copyOf(cur.getUncompacting(readers)).size());
- Assert.assertTrue(ImmutableSet.copyOf(cur.sstables(SSTableSet.NONCOMPACTING)).containsAll(readers.subList(2, 5)));
- Assert.assertEquals(3, ImmutableSet.copyOf(cur.sstables(SSTableSet.NONCOMPACTING)).size());
+ Assert.assertTrue(ImmutableSet.copyOf(cur.select(SSTableSet.NONCOMPACTING)).containsAll(readers.subList(2, 5)));
+ Assert.assertEquals(3, ImmutableSet.copyOf(cur.select(SSTableSet.NONCOMPACTING)).size());
// check marking already compacting readers fails with an exception
testFailure(View.updateCompacting(emptySet(), readers.subList(0, 1)), cur);
@@ -129,7 +129,7 @@ public class ViewTest
testFailure(View.updateCompacting(copyOf(readers.subList(0, 2)), emptySet()), cur);
Assert.assertTrue(copyOf(concat(readers.subList(0, 1), readers.subList(2, 5))).containsAll(copyOf(cur.getUncompacting(readers))));
Assert.assertEquals(4, copyOf(cur.getUncompacting(readers)).size());
- Set<SSTableReader> nonCompacting = ImmutableSet.copyOf(cur.sstables(SSTableSet.NONCOMPACTING));
+ Set<SSTableReader> nonCompacting = ImmutableSet.copyOf(cur.select(SSTableSet.NONCOMPACTING));
Assert.assertTrue(nonCompacting.containsAll(readers.subList(2, 5)));
Assert.assertTrue(nonCompacting.containsAll(readers.subList(0, 1)));
Assert.assertEquals(4, nonCompacting.size());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
index 6aaefb7..2124abe 100644
--- a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
+++ b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
@@ -623,7 +623,7 @@ public class CustomCassandraIndex implements Index
{
baseCfs.forceBlockingFlush();
- try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL));
+ try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL));
Refs<SSTableReader> sstables = viewFragment.refs)
{
if (sstables.isEmpty())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc23632f/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 18bc760..c842b7f 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -759,7 +759,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
if (!checked && writer.currentWriter().getFilePointer() > 15000000)
{
checked = true;
- ColumnFamilyStore.ViewFragment viewFragment = cfs.select(View.select(SSTableSet.CANONICAL));
+ ColumnFamilyStore.ViewFragment viewFragment = cfs.select(View.selectFunction(SSTableSet.CANONICAL));
// canonical view should have only one SSTable which is not opened early.
assertEquals(1, viewFragment.sstables.size());
SSTableReader sstable = viewFragment.sstables.get(0);
@@ -872,6 +872,45 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
validateCFS(cfs);
}
+ @Test
+ public void testCanonicalSSTables() throws ExecutionException, InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ truncate(cfs);
+
+ cfs.addSSTable(writeFile(cfs, 100));
+ Collection<SSTableReader> allSSTables = cfs.getSSTables();
+ assertEquals(1, allSSTables.size());
+ final AtomicBoolean done = new AtomicBoolean(false);
+ final AtomicBoolean failed = new AtomicBoolean(false);
+ Runnable r = () -> {
+ while (!done.get())
+ {
+ Iterable<SSTableReader> sstables = cfs.getSSTables(SSTableSet.CANONICAL);
+ if (Iterables.size(sstables) != 1)
+ {
+ failed.set(true);
+ return;
+ }
+ }
+ };
+ Thread t = new Thread(r);
+ try
+ {
+ t.start();
+ cfs.forceMajorCompaction();
+ }
+ finally
+ {
+ done.set(true);
+ t.join(20);
+ }
+ assertFalse(failed.get());
+
+
+ }
+
private void validateKeys(Keyspace ks)
{
for (int i = 0; i < 100; i++)