You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2017/04/04 16:48:53 UTC
[7/8] cassandra git commit: Avoid filtering sstables based on
generation when ViewBuilder restarts
Avoid filtering sstables based on generation when ViewBuilder restarts
Patch by tjake; reviewed by Paulo Motta for CASSANDRA-13405
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/be96c284
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/be96c284
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/be96c284
Branch: refs/heads/cassandra-3.11
Commit: be96c2840b0a6b269e22bde246b84e8ef4aeef69
Parents: 449400b
Author: T Jake Luciani <ja...@apache.org>
Authored: Mon Apr 3 13:56:04 2017 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Tue Apr 4 12:46:42 2017 -0400
----------------------------------------------------------------------
CHANGES.txt | 4 +-
src/java/org/apache/cassandra/db/view/View.java | 5 ++
.../apache/cassandra/db/view/ViewBuilder.java | 52 +++++++++--------
.../org/apache/cassandra/cql3/ViewTest.java | 59 ++++++++++++++++++++
4 files changed, 95 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/be96c284/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e63e266..1ca8733 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,7 +1,7 @@
3.11.0
* Fix testLimitSSTables flake caused by concurrent flush (CASSANDRA-12820)
* cdc column addition strikes again (CASSANDRA-13382)
- * Fix static column indexes (CASSANDRA-13277)
+ * Fix static column indexes (CASSANDRA-13277)
* DataOutputBuffer.asNewBuffer broken (CASSANDRA-13298)
* unittest CipherFactoryTest failed on MacOS (CASSANDRA-13370)
* Forbid SELECT restrictions and CREATE INDEX over non-frozen UDT columns (CASSANDRA-13247)
@@ -19,6 +19,8 @@
* NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983)
* Address message coalescing regression (CASSANDRA-12676)
Merged from 3.0:
+ * Fix view builder bug that can filter out data on restart (CASSANDRA-13405)
+ * Fix 2i page size calculation when there are no regular columns (CASSANDRA-13400)
* Fix the conversion of 2.X expired rows without regular column data (CASSANDRA-13395)
* Fix hint delivery when using ext+internal IPs with prefer_local enabled (CASSANDRA-13020)
* Fix possible NPE on upgrade to 3.0/3.X in case of IO errors (CASSANDRA-13389)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/be96c284/src/java/org/apache/cassandra/db/view/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/View.java b/src/java/org/apache/cassandra/db/view/View.java
index 0b8de9e..e4c6e02 100644
--- a/src/java/org/apache/cassandra/db/view/View.java
+++ b/src/java/org/apache/cassandra/db/view/View.java
@@ -197,7 +197,11 @@ public class View
public ReadQuery getReadQuery()
{
if (query == null)
+ {
query = getSelectStatement().getQuery(QueryOptions.forInternalCalls(Collections.emptyList()), FBUtilities.nowInSeconds());
+ logger.trace("View query: {}", rawSelect);
+ }
+
return query;
}
@@ -205,6 +209,7 @@ public class View
{
if (this.builder != null)
{
+ logger.debug("Stopping current view builder due to schema change");
this.builder.stop();
this.builder = null;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/be96c284/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index 9550e1e..8e647ea 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
@@ -72,8 +73,12 @@ public class ViewBuilder extends CompactionInfo.Holder
private void buildKey(DecoratedKey key)
{
ReadQuery selectQuery = view.getReadQuery();
+
if (!selectQuery.selectsKey(key))
+ {
+ logger.trace("Skipping {}, view query filters", key);
return;
+ }
int nowInSec = FBUtilities.nowInSeconds();
SinglePartitionReadCommand command = view.getSelectStatement().internalReadForView(key, nowInSec);
@@ -96,55 +101,46 @@ public class ViewBuilder extends CompactionInfo.Holder
public void run()
{
+ logger.debug("Starting view builder for {}.{}", baseCfs.metadata.ksName, view.name);
logger.trace("Running view builder for {}.{}", baseCfs.metadata.ksName, view.name);
UUID localHostId = SystemKeyspace.getLocalHostId();
String ksname = baseCfs.metadata.ksName, viewName = view.name;
if (SystemKeyspace.isViewBuilt(ksname, viewName))
{
+ logger.debug("View already marked built for {}.{}", baseCfs.metadata.ksName, view.name);
if (!SystemKeyspace.isViewStatusReplicated(ksname, viewName))
updateDistributed(ksname, viewName, localHostId);
return;
}
Iterable<Range<Token>> ranges = StorageService.instance.getLocalRanges(baseCfs.metadata.ksName);
+
final Pair<Integer, Token> buildStatus = SystemKeyspace.getViewBuildStatus(ksname, viewName);
Token lastToken;
Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>> function;
if (buildStatus == null)
{
- baseCfs.forceBlockingFlush();
- function = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL);
- int generation = Integer.MIN_VALUE;
-
- try (Refs<SSTableReader> temp = baseCfs.selectAndReference(function).refs)
- {
- for (SSTableReader reader : temp)
- {
- generation = Math.max(reader.descriptor.generation, generation);
- }
- }
-
- SystemKeyspace.beginViewBuild(ksname, viewName, generation);
+ logger.debug("Starting new view build. flushing base table {}.{}", baseCfs.metadata.ksName, baseCfs.name);
lastToken = null;
+
+ //We don't track the generation number anymore since if a rebuild is stopped and
+ //restarted the max generation filter may yield no sstables due to compactions.
+ //We only care about max generation *during* a build, not across builds.
+ //see CASSANDRA-13405
+ SystemKeyspace.beginViewBuild(ksname, viewName, 0);
}
else
{
- function = new Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>>()
- {
- @Nullable
- public Iterable<SSTableReader> apply(org.apache.cassandra.db.lifecycle.View view)
- {
- Iterable<SSTableReader> readers = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL).apply(view);
- if (readers != null)
- return Iterables.filter(readers, ssTableReader -> ssTableReader.descriptor.generation <= buildStatus.left);
- return null;
- }
- };
lastToken = buildStatus.right;
+ logger.debug("Resuming view build from token {}. flushing base table {}.{}", lastToken, baseCfs.metadata.ksName, baseCfs.name);
}
+ baseCfs.forceBlockingFlush();
+ function = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL);
+
prevToken = lastToken;
+ long keysBuilt = 0;
try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(function).refs;
ReducingKeyIterator iter = new ReducingKeyIterator(sstables))
{
@@ -160,6 +156,7 @@ public class ViewBuilder extends CompactionInfo.Holder
if (range.contains(token))
{
buildKey(key);
+ ++keysBuilt;
if (prevToken == null || prevToken.compareTo(token) != 0)
{
@@ -168,15 +165,21 @@ public class ViewBuilder extends CompactionInfo.Holder
}
}
}
+
lastToken = null;
}
}
if (!isStopped)
{
+ logger.debug("Marking view({}.{}) as built covered {} keys ", ksname, viewName, keysBuilt);
SystemKeyspace.finishViewBuildStatus(ksname, viewName);
updateDistributed(ksname, viewName, localHostId);
}
+ else
+ {
+ logger.debug("Stopped build for view({}.{}) after covering {} keys", ksname, viewName, keysBuilt);
+ }
}
catch (Exception e)
{
@@ -222,6 +225,7 @@ public class ViewBuilder extends CompactionInfo.Holder
if (lastToken == null || range.contains(lastToken))
rangesLeft = 0;
}
+
return new CompactionInfo(baseCfs.metadata, OperationType.VIEW_BUILD, rangesLeft, rangesTotal, "ranges", compactionId);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/be96c284/test/unit/org/apache/cassandra/cql3/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/ViewTest.java b/test/unit/org/apache/cassandra/cql3/ViewTest.java
index ac065e6..6f6e04d 100644
--- a/test/unit/org/apache/cassandra/cql3/ViewTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java
@@ -21,8 +21,11 @@ package org.apache.cassandra.cql3;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import com.google.common.util.concurrent.Uninterruptibles;
+
import junit.framework.Assert;
import org.junit.After;
import org.junit.Before;
@@ -40,6 +43,7 @@ import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.FBUtilities;
@@ -1234,4 +1238,59 @@ public class ViewTest extends CQLTester
{
}
}
+
+ @Test
+ public void testViewBuilderResume() throws Throwable
+ {
+ createTable("CREATE TABLE %s (" +
+ "k int, " +
+ "c int, " +
+ "val text, " +
+ "PRIMARY KEY(k,c))");
+
+ execute("USE " + keyspace());
+ executeNet(protocolVersion, "USE " + keyspace());
+
+ CompactionManager.instance.setCoreCompactorThreads(1);
+ CompactionManager.instance.setMaximumCompactorThreads(1);
+ ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+ cfs.disableAutoCompaction();
+
+ for (int i = 0; i < 1024; i++)
+ execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+ cfs.forceBlockingFlush();
+
+ for (int i = 0; i < 1024; i++)
+ execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+ cfs.forceBlockingFlush();
+
+ for (int i = 0; i < 1024; i++)
+ execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+ cfs.forceBlockingFlush();
+
+ for (int i = 0; i < 1024; i++)
+ execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+ cfs.forceBlockingFlush();
+
+ createView("mv_test", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (val,k,c)");
+
+ cfs.enableAutoCompaction();
+ List<Future<?>> futures = CompactionManager.instance.submitBackground(cfs);
+
+ //Force a second MV on the same base table, which will restart the first MV builder...
+ createView("mv_test2", "CREATE MATERIALIZED VIEW %s AS SELECT val, k, c FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (val,k,c)");
+
+
+ //Compact the base table
+ FBUtilities.waitOnFutures(futures);
+
+ while (!SystemKeyspace.isViewBuilt(keyspace(), "mv_test"))
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+
+ assertRows(execute("SELECT count(*) FROM mv_test"), row(1024L));
+ }
}