You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2017/04/04 16:48:47 UTC

[1/8] cassandra git commit: Avoid filtering sstables based on generation when ViewBuilder restarts

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 828ca7cc9 -> 2f1ab4a42
  refs/heads/cassandra-3.11 e8053dd8b -> be96c2840
  refs/heads/trunk 633babf0f -> 522ddba27


Avoid filtering sstables based on generation when ViewBuilder restarts

Patch by tjake; reviewed by Paulo Motta for CASSANDRA-13405


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2f1ab4a4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2f1ab4a4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2f1ab4a4

Branch: refs/heads/cassandra-3.0
Commit: 2f1ab4a4248ac24c890e195cd5714ca54510c19a
Parents: 828ca7c
Author: T Jake Luciani <ja...@apache.org>
Authored: Mon Apr 3 13:56:04 2017 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Tue Apr 4 12:45:52 2017 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 src/java/org/apache/cassandra/db/view/View.java |  5 ++
 .../apache/cassandra/db/view/ViewBuilder.java   | 57 ++++++++++---------
 .../org/apache/cassandra/cql3/ViewTest.java     | 59 ++++++++++++++++++++
 4 files changed, 97 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f1ab4a4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c258203..d9a97ad 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.13
+ * Fix view builder bug that can filter out data on restart (CASSANDRA-13405)
  * Fix 2i page size calculation when there are no regular columns (CASSANDRA-13400)
  * Fix the conversion of 2.X expired rows without regular column data (CASSANDRA-13395)
  * Fix hint delivery when using ext+internal IPs with prefer_local enabled (CASSANDRA-13020)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f1ab4a4/src/java/org/apache/cassandra/db/view/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/View.java b/src/java/org/apache/cassandra/db/view/View.java
index 845a6ab..e471349 100644
--- a/src/java/org/apache/cassandra/db/view/View.java
+++ b/src/java/org/apache/cassandra/db/view/View.java
@@ -208,7 +208,11 @@ public class View
     public ReadQuery getReadQuery()
     {
         if (query == null)
+        {
             query = getSelectStatement().getQuery(QueryOptions.forInternalCalls(Collections.emptyList()), FBUtilities.nowInSeconds());
+            logger.trace("View query: {}", rawSelect);
+        }
+
         return query;
     }
 
@@ -216,6 +220,7 @@ public class View
     {
         if (this.builder != null)
         {
+            logger.debug("Stopping current view builder due to schema change");
             this.builder.stop();
             this.builder = null;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f1ab4a4/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index 37c0e7b..94314fd 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 
@@ -74,8 +75,12 @@ public class ViewBuilder extends CompactionInfo.Holder
     {
         AtomicLong noBase = new AtomicLong(Long.MAX_VALUE);
         ReadQuery selectQuery = view.getReadQuery();
+
         if (!selectQuery.selectsKey(key))
+        {
+            logger.trace("Skipping {}, view query filters", key);
             return;
+        }
 
         int nowInSec = FBUtilities.nowInSeconds();
         SinglePartitionReadCommand command = view.getSelectStatement().internalReadForView(key, nowInSec);
@@ -97,49 +102,41 @@ public class ViewBuilder extends CompactionInfo.Holder
 
     public void run()
     {
+        logger.debug("Starting view builder for {}.{}", baseCfs.metadata.ksName, view.name);
         String ksname = baseCfs.metadata.ksName, viewName = view.name;
 
         if (SystemKeyspace.isViewBuilt(ksname, viewName))
+        {
+            logger.debug("View already marked built for {}.{}", baseCfs.metadata.ksName, view.name);
             return;
-
+        }
         Iterable<Range<Token>> ranges = StorageService.instance.getLocalRanges(baseCfs.metadata.ksName);
+
         final Pair<Integer, Token> buildStatus = SystemKeyspace.getViewBuildStatus(ksname, viewName);
         Token lastToken;
         Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>> function;
         if (buildStatus == null)
         {
-            baseCfs.forceBlockingFlush();
-            function = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL);
-            int generation = Integer.MIN_VALUE;
-
-            try (Refs<SSTableReader> temp = baseCfs.selectAndReference(function).refs)
-            {
-                for (SSTableReader reader : temp)
-                {
-                    generation = Math.max(reader.descriptor.generation, generation);
-                }
-            }
-
-            SystemKeyspace.beginViewBuild(ksname, viewName, generation);
+            logger.debug("Starting new view build. flushing base table {}.{}", baseCfs.metadata.ksName, baseCfs.name);
             lastToken = null;
+
+            //We don't track the generation number anymore since if a rebuild is stopped and
+            //restarted the max generation filter may yield no sstables due to compactions.
+            //We only care about max generation *during* a build, not across builds.
+            //see CASSANDRA-13405
+            SystemKeyspace.beginViewBuild(ksname, viewName, 0);
         }
         else
         {
-            function = new Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>>()
-            {
-                @Nullable
-                public Iterable<SSTableReader> apply(org.apache.cassandra.db.lifecycle.View view)
-                {
-                    Iterable<SSTableReader> readers = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL).apply(view);
-                    if (readers != null)
-                        return Iterables.filter(readers, ssTableReader -> ssTableReader.descriptor.generation <= buildStatus.left);
-                    return null;
-                }
-            };
             lastToken = buildStatus.right;
+            logger.debug("Resuming view build from token {}. flushing base table {}.{}", lastToken, baseCfs.metadata.ksName, baseCfs.name);
         }
 
+        baseCfs.forceBlockingFlush();
+        function = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL);
+
         prevToken = lastToken;
+        long keysBuilt = 0;
         try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(function).refs;
              ReducingKeyIterator iter = new ReducingKeyIterator(sstables))
         {
@@ -154,6 +151,7 @@ public class ViewBuilder extends CompactionInfo.Holder
                         if (range.contains(token))
                         {
                             buildKey(key);
+                            ++keysBuilt;
 
                             if (prevToken == null || prevToken.compareTo(token) != 0)
                             {
@@ -162,12 +160,20 @@ public class ViewBuilder extends CompactionInfo.Holder
                             }
                         }
                     }
+
                     lastToken = null;
                 }
             }
 
             if (!isStopped)
+            {
+                logger.debug("Marking view({}.{}) as built covered {} keys ", ksname, viewName, keysBuilt);
                 SystemKeyspace.finishViewBuildStatus(ksname, viewName);
+            }
+            else
+            {
+                logger.debug("Stopped build for view({}.{}) after covering {} keys", ksname, viewName, keysBuilt);
+            }
         }
         catch (Exception e)
         {
@@ -198,6 +204,7 @@ public class ViewBuilder extends CompactionInfo.Holder
             if (lastToken == null || range.contains(lastToken))
                 rangesLeft = 0;
         }
+
         return new CompactionInfo(baseCfs.metadata, OperationType.VIEW_BUILD, rangesLeft, rangesTotal, "ranges", compactionId);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f1ab4a4/test/unit/org/apache/cassandra/cql3/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/ViewTest.java b/test/unit/org/apache/cassandra/cql3/ViewTest.java
index 2070bef..e595ebd 100644
--- a/test/unit/org/apache/cassandra/cql3/ViewTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java
@@ -21,8 +21,11 @@ package org.apache.cassandra.cql3;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.Uninterruptibles;
+
 import junit.framework.Assert;
 import org.junit.After;
 import org.junit.Before;
@@ -40,6 +43,7 @@ import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.junit.Assert.assertTrue;
@@ -1203,4 +1207,59 @@ public class ViewTest extends CQLTester
         assertRowsNet(protocolVersion, executeNet(protocolVersion, "SELECT * FROM %s"), row(0, 1));
         assertRowsNet(protocolVersion, executeNet(protocolVersion, "SELECT * FROM mv"), row(1, 0));
     }
+
+    @Test
+    public void testViewBuilderResume() throws Throwable
+    {
+        createTable("CREATE TABLE %s (" +
+                    "k int, " +
+                    "c int, " +
+                    "val text, " +
+                    "PRIMARY KEY(k,c))");
+
+        execute("USE " + keyspace());
+        executeNet(protocolVersion, "USE " + keyspace());
+
+        CompactionManager.instance.setCoreCompactorThreads(1);
+        CompactionManager.instance.setMaximumCompactorThreads(1);
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+        cfs.disableAutoCompaction();
+
+        for (int i = 0; i < 1024; i++)
+            execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+        cfs.forceBlockingFlush();
+
+        for (int i = 0; i < 1024; i++)
+            execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+        cfs.forceBlockingFlush();
+
+        for (int i = 0; i < 1024; i++)
+            execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+        cfs.forceBlockingFlush();
+
+        for (int i = 0; i < 1024; i++)
+            execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+        cfs.forceBlockingFlush();
+
+        createView("mv_test", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (val,k,c)");
+
+        cfs.enableAutoCompaction();
+        List<Future<?>> futures = CompactionManager.instance.submitBackground(cfs);
+
+        //Force a second MV on the same base table, which will restart the first MV builder...
+        createView("mv_test2", "CREATE MATERIALIZED VIEW %s AS SELECT val, k, c FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (val,k,c)");
+
+
+        //Compact the base table
+        FBUtilities.waitOnFutures(futures);
+
+        while (!SystemKeyspace.isViewBuilt(keyspace(), "mv_test"))
+            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+
+        assertRows(execute("SELECT count(*) FROM mv_test"), row(1024L));
+    }
 }


[5/8] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by ja...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/449400be
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/449400be
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/449400be

Branch: refs/heads/trunk
Commit: 449400be30a591a69499f2f81dedefdbe0e14785
Parents: e8053dd 2f1ab4a
Author: T Jake Luciani <ja...@apache.org>
Authored: Tue Apr 4 12:46:12 2017 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Tue Apr 4 12:46:12 2017 -0400

----------------------------------------------------------------------

----------------------------------------------------------------------



[6/8] cassandra git commit: Avoid filtering sstables based on generation when ViewBuilder restarts

Posted by ja...@apache.org.
Avoid filtering sstables based on generation when ViewBuilder restarts

Patch by tjake; reviewed by Paulo Motta for CASSANDRA-13405


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/be96c284
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/be96c284
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/be96c284

Branch: refs/heads/trunk
Commit: be96c2840b0a6b269e22bde246b84e8ef4aeef69
Parents: 449400b
Author: T Jake Luciani <ja...@apache.org>
Authored: Mon Apr 3 13:56:04 2017 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Tue Apr 4 12:46:42 2017 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |  4 +-
 src/java/org/apache/cassandra/db/view/View.java |  5 ++
 .../apache/cassandra/db/view/ViewBuilder.java   | 52 +++++++++--------
 .../org/apache/cassandra/cql3/ViewTest.java     | 59 ++++++++++++++++++++
 4 files changed, 95 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/be96c284/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e63e266..1ca8733 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,7 +1,7 @@
 3.11.0
  * Fix testLimitSSTables flake caused by concurrent flush (CASSANDRA-12820)
  * cdc column addition strikes again (CASSANDRA-13382)
- * Fix static column indexes (CASSANDRA-13277) 
+ * Fix static column indexes (CASSANDRA-13277)
  * DataOutputBuffer.asNewBuffer broken (CASSANDRA-13298)
  * unittest CipherFactoryTest failed on MacOS (CASSANDRA-13370)
  * Forbid SELECT restrictions and CREATE INDEX over non-frozen UDT columns (CASSANDRA-13247)
@@ -19,6 +19,8 @@
  * NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983)
  * Address message coalescing regression (CASSANDRA-12676)
 Merged from 3.0:
+ * Fix view builder bug that can filter out data on restart (CASSANDRA-13405)
+ * Fix 2i page size calculation when there are no regular columns (CASSANDRA-13400)
  * Fix the conversion of 2.X expired rows without regular column data (CASSANDRA-13395)
  * Fix hint delivery when using ext+internal IPs with prefer_local enabled (CASSANDRA-13020)
  * Fix possible NPE on upgrade to 3.0/3.X in case of IO errors (CASSANDRA-13389)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/be96c284/src/java/org/apache/cassandra/db/view/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/View.java b/src/java/org/apache/cassandra/db/view/View.java
index 0b8de9e..e4c6e02 100644
--- a/src/java/org/apache/cassandra/db/view/View.java
+++ b/src/java/org/apache/cassandra/db/view/View.java
@@ -197,7 +197,11 @@ public class View
     public ReadQuery getReadQuery()
     {
         if (query == null)
+        {
             query = getSelectStatement().getQuery(QueryOptions.forInternalCalls(Collections.emptyList()), FBUtilities.nowInSeconds());
+            logger.trace("View query: {}", rawSelect);
+        }
+
         return query;
     }
 
@@ -205,6 +209,7 @@ public class View
     {
         if (this.builder != null)
         {
+            logger.debug("Stopping current view builder due to schema change");
             this.builder.stop();
             this.builder = null;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/be96c284/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index 9550e1e..8e647ea 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 
@@ -72,8 +73,12 @@ public class ViewBuilder extends CompactionInfo.Holder
     private void buildKey(DecoratedKey key)
     {
         ReadQuery selectQuery = view.getReadQuery();
+
         if (!selectQuery.selectsKey(key))
+        {
+            logger.trace("Skipping {}, view query filters", key);
             return;
+        }
 
         int nowInSec = FBUtilities.nowInSeconds();
         SinglePartitionReadCommand command = view.getSelectStatement().internalReadForView(key, nowInSec);
@@ -96,55 +101,46 @@ public class ViewBuilder extends CompactionInfo.Holder
 
     public void run()
     {
+        logger.debug("Starting view builder for {}.{}", baseCfs.metadata.ksName, view.name);
         logger.trace("Running view builder for {}.{}", baseCfs.metadata.ksName, view.name);
         UUID localHostId = SystemKeyspace.getLocalHostId();
         String ksname = baseCfs.metadata.ksName, viewName = view.name;
 
         if (SystemKeyspace.isViewBuilt(ksname, viewName))
         {
+            logger.debug("View already marked built for {}.{}", baseCfs.metadata.ksName, view.name);
             if (!SystemKeyspace.isViewStatusReplicated(ksname, viewName))
                 updateDistributed(ksname, viewName, localHostId);
             return;
         }
 
         Iterable<Range<Token>> ranges = StorageService.instance.getLocalRanges(baseCfs.metadata.ksName);
+
         final Pair<Integer, Token> buildStatus = SystemKeyspace.getViewBuildStatus(ksname, viewName);
         Token lastToken;
         Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>> function;
         if (buildStatus == null)
         {
-            baseCfs.forceBlockingFlush();
-            function = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL);
-            int generation = Integer.MIN_VALUE;
-
-            try (Refs<SSTableReader> temp = baseCfs.selectAndReference(function).refs)
-            {
-                for (SSTableReader reader : temp)
-                {
-                    generation = Math.max(reader.descriptor.generation, generation);
-                }
-            }
-
-            SystemKeyspace.beginViewBuild(ksname, viewName, generation);
+            logger.debug("Starting new view build. flushing base table {}.{}", baseCfs.metadata.ksName, baseCfs.name);
             lastToken = null;
+
+            //We don't track the generation number anymore since if a rebuild is stopped and
+            //restarted the max generation filter may yield no sstables due to compactions.
+            //We only care about max generation *during* a build, not across builds.
+            //see CASSANDRA-13405
+            SystemKeyspace.beginViewBuild(ksname, viewName, 0);
         }
         else
         {
-            function = new Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>>()
-            {
-                @Nullable
-                public Iterable<SSTableReader> apply(org.apache.cassandra.db.lifecycle.View view)
-                {
-                    Iterable<SSTableReader> readers = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL).apply(view);
-                    if (readers != null)
-                        return Iterables.filter(readers, ssTableReader -> ssTableReader.descriptor.generation <= buildStatus.left);
-                    return null;
-                }
-            };
             lastToken = buildStatus.right;
+            logger.debug("Resuming view build from token {}. flushing base table {}.{}", lastToken, baseCfs.metadata.ksName, baseCfs.name);
         }
 
+        baseCfs.forceBlockingFlush();
+        function = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL);
+
         prevToken = lastToken;
+        long keysBuilt = 0;
         try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(function).refs;
              ReducingKeyIterator iter = new ReducingKeyIterator(sstables))
         {
@@ -160,6 +156,7 @@ public class ViewBuilder extends CompactionInfo.Holder
                         if (range.contains(token))
                         {
                             buildKey(key);
+                            ++keysBuilt;
 
                             if (prevToken == null || prevToken.compareTo(token) != 0)
                             {
@@ -168,15 +165,21 @@ public class ViewBuilder extends CompactionInfo.Holder
                             }
                         }
                     }
+
                     lastToken = null;
                 }
             }
 
             if (!isStopped)
             {
+                logger.debug("Marking view({}.{}) as built covered {} keys ", ksname, viewName, keysBuilt);
                 SystemKeyspace.finishViewBuildStatus(ksname, viewName);
                 updateDistributed(ksname, viewName, localHostId);
             }
+            else
+            {
+                logger.debug("Stopped build for view({}.{}) after covering {} keys", ksname, viewName, keysBuilt);
+            }
         }
         catch (Exception e)
         {
@@ -222,6 +225,7 @@ public class ViewBuilder extends CompactionInfo.Holder
             if (lastToken == null || range.contains(lastToken))
                 rangesLeft = 0;
         }
+
         return new CompactionInfo(baseCfs.metadata, OperationType.VIEW_BUILD, rangesLeft, rangesTotal, "ranges", compactionId);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/be96c284/test/unit/org/apache/cassandra/cql3/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/ViewTest.java b/test/unit/org/apache/cassandra/cql3/ViewTest.java
index ac065e6..6f6e04d 100644
--- a/test/unit/org/apache/cassandra/cql3/ViewTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java
@@ -21,8 +21,11 @@ package org.apache.cassandra.cql3;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.Uninterruptibles;
+
 import junit.framework.Assert;
 import org.junit.After;
 import org.junit.Before;
@@ -40,6 +43,7 @@ import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -1234,4 +1238,59 @@ public class ViewTest extends CQLTester
         {
         }
     }
+
+    @Test
+    public void testViewBuilderResume() throws Throwable
+    {
+        createTable("CREATE TABLE %s (" +
+                    "k int, " +
+                    "c int, " +
+                    "val text, " +
+                    "PRIMARY KEY(k,c))");
+
+        execute("USE " + keyspace());
+        executeNet(protocolVersion, "USE " + keyspace());
+
+        CompactionManager.instance.setCoreCompactorThreads(1);
+        CompactionManager.instance.setMaximumCompactorThreads(1);
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+        cfs.disableAutoCompaction();
+
+        for (int i = 0; i < 1024; i++)
+            execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+        cfs.forceBlockingFlush();
+
+        for (int i = 0; i < 1024; i++)
+            execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+        cfs.forceBlockingFlush();
+
+        for (int i = 0; i < 1024; i++)
+            execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+        cfs.forceBlockingFlush();
+
+        for (int i = 0; i < 1024; i++)
+            execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+        cfs.forceBlockingFlush();
+
+        createView("mv_test", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (val,k,c)");
+
+        cfs.enableAutoCompaction();
+        List<Future<?>> futures = CompactionManager.instance.submitBackground(cfs);
+
+        //Force a second MV on the same base table, which will restart the first MV builder...
+        createView("mv_test2", "CREATE MATERIALIZED VIEW %s AS SELECT val, k, c FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (val,k,c)");
+
+
+        //Compact the base table
+        FBUtilities.waitOnFutures(futures);
+
+        while (!SystemKeyspace.isViewBuilt(keyspace(), "mv_test"))
+            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+
+        assertRows(execute("SELECT count(*) FROM mv_test"), row(1024L));
+    }
 }


[3/8] cassandra git commit: Avoid filtering sstables based on generation when ViewBuilder restarts

Posted by ja...@apache.org.
Avoid filtering sstables based on generation when ViewBuilder restarts

Patch by tjake; reviewed by Paulo Motta for CASSANDRA-13405


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2f1ab4a4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2f1ab4a4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2f1ab4a4

Branch: refs/heads/trunk
Commit: 2f1ab4a4248ac24c890e195cd5714ca54510c19a
Parents: 828ca7c
Author: T Jake Luciani <ja...@apache.org>
Authored: Mon Apr 3 13:56:04 2017 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Tue Apr 4 12:45:52 2017 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 src/java/org/apache/cassandra/db/view/View.java |  5 ++
 .../apache/cassandra/db/view/ViewBuilder.java   | 57 ++++++++++---------
 .../org/apache/cassandra/cql3/ViewTest.java     | 59 ++++++++++++++++++++
 4 files changed, 97 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f1ab4a4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c258203..d9a97ad 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.13
+ * Fix view builder bug that can filter out data on restart (CASSANDRA-13405)
  * Fix 2i page size calculation when there are no regular columns (CASSANDRA-13400)
  * Fix the conversion of 2.X expired rows without regular column data (CASSANDRA-13395)
  * Fix hint delivery when using ext+internal IPs with prefer_local enabled (CASSANDRA-13020)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f1ab4a4/src/java/org/apache/cassandra/db/view/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/View.java b/src/java/org/apache/cassandra/db/view/View.java
index 845a6ab..e471349 100644
--- a/src/java/org/apache/cassandra/db/view/View.java
+++ b/src/java/org/apache/cassandra/db/view/View.java
@@ -208,7 +208,11 @@ public class View
     public ReadQuery getReadQuery()
     {
         if (query == null)
+        {
             query = getSelectStatement().getQuery(QueryOptions.forInternalCalls(Collections.emptyList()), FBUtilities.nowInSeconds());
+            logger.trace("View query: {}", rawSelect);
+        }
+
         return query;
     }
 
@@ -216,6 +220,7 @@ public class View
     {
         if (this.builder != null)
         {
+            logger.debug("Stopping current view builder due to schema change");
             this.builder.stop();
             this.builder = null;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f1ab4a4/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index 37c0e7b..94314fd 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 
@@ -74,8 +75,12 @@ public class ViewBuilder extends CompactionInfo.Holder
     {
         AtomicLong noBase = new AtomicLong(Long.MAX_VALUE);
         ReadQuery selectQuery = view.getReadQuery();
+
         if (!selectQuery.selectsKey(key))
+        {
+            logger.trace("Skipping {}, view query filters", key);
             return;
+        }
 
         int nowInSec = FBUtilities.nowInSeconds();
         SinglePartitionReadCommand command = view.getSelectStatement().internalReadForView(key, nowInSec);
@@ -97,49 +102,41 @@ public class ViewBuilder extends CompactionInfo.Holder
 
     public void run()
     {
+        logger.debug("Starting view builder for {}.{}", baseCfs.metadata.ksName, view.name);
         String ksname = baseCfs.metadata.ksName, viewName = view.name;
 
         if (SystemKeyspace.isViewBuilt(ksname, viewName))
+        {
+            logger.debug("View already marked built for {}.{}", baseCfs.metadata.ksName, view.name);
             return;
-
+        }
         Iterable<Range<Token>> ranges = StorageService.instance.getLocalRanges(baseCfs.metadata.ksName);
+
         final Pair<Integer, Token> buildStatus = SystemKeyspace.getViewBuildStatus(ksname, viewName);
         Token lastToken;
         Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>> function;
         if (buildStatus == null)
         {
-            baseCfs.forceBlockingFlush();
-            function = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL);
-            int generation = Integer.MIN_VALUE;
-
-            try (Refs<SSTableReader> temp = baseCfs.selectAndReference(function).refs)
-            {
-                for (SSTableReader reader : temp)
-                {
-                    generation = Math.max(reader.descriptor.generation, generation);
-                }
-            }
-
-            SystemKeyspace.beginViewBuild(ksname, viewName, generation);
+            logger.debug("Starting new view build. flushing base table {}.{}", baseCfs.metadata.ksName, baseCfs.name);
             lastToken = null;
+
+            //We don't track the generation number anymore since if a rebuild is stopped and
+            //restarted the max generation filter may yield no sstables due to compactions.
+            //We only care about max generation *during* a build, not across builds.
+            //see CASSANDRA-13405
+            SystemKeyspace.beginViewBuild(ksname, viewName, 0);
         }
         else
         {
-            function = new Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>>()
-            {
-                @Nullable
-                public Iterable<SSTableReader> apply(org.apache.cassandra.db.lifecycle.View view)
-                {
-                    Iterable<SSTableReader> readers = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL).apply(view);
-                    if (readers != null)
-                        return Iterables.filter(readers, ssTableReader -> ssTableReader.descriptor.generation <= buildStatus.left);
-                    return null;
-                }
-            };
             lastToken = buildStatus.right;
+            logger.debug("Resuming view build from token {}. flushing base table {}.{}", lastToken, baseCfs.metadata.ksName, baseCfs.name);
         }
 
+        baseCfs.forceBlockingFlush();
+        function = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL);
+
         prevToken = lastToken;
+        long keysBuilt = 0;
         try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(function).refs;
              ReducingKeyIterator iter = new ReducingKeyIterator(sstables))
         {
@@ -154,6 +151,7 @@ public class ViewBuilder extends CompactionInfo.Holder
                         if (range.contains(token))
                         {
                             buildKey(key);
+                            ++keysBuilt;
 
                             if (prevToken == null || prevToken.compareTo(token) != 0)
                             {
@@ -162,12 +160,20 @@ public class ViewBuilder extends CompactionInfo.Holder
                             }
                         }
                     }
+
                     lastToken = null;
                 }
             }
 
             if (!isStopped)
+            {
+                logger.debug("Marking view({}.{}) as built covered {} keys ", ksname, viewName, keysBuilt);
                 SystemKeyspace.finishViewBuildStatus(ksname, viewName);
+            }
+            else
+            {
+                logger.debug("Stopped build for view({}.{}) after covering {} keys", ksname, viewName, keysBuilt);
+            }
         }
         catch (Exception e)
         {
@@ -198,6 +204,7 @@ public class ViewBuilder extends CompactionInfo.Holder
             if (lastToken == null || range.contains(lastToken))
                 rangesLeft = 0;
         }
+
         return new CompactionInfo(baseCfs.metadata, OperationType.VIEW_BUILD, rangesLeft, rangesTotal, "ranges", compactionId);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f1ab4a4/test/unit/org/apache/cassandra/cql3/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/ViewTest.java b/test/unit/org/apache/cassandra/cql3/ViewTest.java
index 2070bef..e595ebd 100644
--- a/test/unit/org/apache/cassandra/cql3/ViewTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java
@@ -21,8 +21,11 @@ package org.apache.cassandra.cql3;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.Uninterruptibles;
+
 import junit.framework.Assert;
 import org.junit.After;
 import org.junit.Before;
@@ -40,6 +43,7 @@ import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.junit.Assert.assertTrue;
@@ -1203,4 +1207,59 @@ public class ViewTest extends CQLTester
         assertRowsNet(protocolVersion, executeNet(protocolVersion, "SELECT * FROM %s"), row(0, 1));
         assertRowsNet(protocolVersion, executeNet(protocolVersion, "SELECT * FROM mv"), row(1, 0));
     }
+
+    @Test
+    public void testViewBuilderResume() throws Throwable
+    {
+        createTable("CREATE TABLE %s (" +
+                    "k int, " +
+                    "c int, " +
+                    "val text, " +
+                    "PRIMARY KEY(k,c))");
+
+        execute("USE " + keyspace());
+        executeNet(protocolVersion, "USE " + keyspace());
+
+        CompactionManager.instance.setCoreCompactorThreads(1);
+        CompactionManager.instance.setMaximumCompactorThreads(1);
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+        cfs.disableAutoCompaction();
+
+        for (int i = 0; i < 1024; i++)
+            execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+        cfs.forceBlockingFlush();
+
+        for (int i = 0; i < 1024; i++)
+            execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+        cfs.forceBlockingFlush();
+
+        for (int i = 0; i < 1024; i++)
+            execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+        cfs.forceBlockingFlush();
+
+        for (int i = 0; i < 1024; i++)
+            execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+        cfs.forceBlockingFlush();
+
+        createView("mv_test", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (val,k,c)");
+
+        cfs.enableAutoCompaction();
+        List<Future<?>> futures = CompactionManager.instance.submitBackground(cfs);
+
+        //Force a second MV on the same base table, which will restart the first MV builder...
+        createView("mv_test2", "CREATE MATERIALIZED VIEW %s AS SELECT val, k, c FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (val,k,c)");
+
+
+        //Compact the base table
+        FBUtilities.waitOnFutures(futures);
+
+        while (!SystemKeyspace.isViewBuilt(keyspace(), "mv_test"))
+            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+
+        assertRows(execute("SELECT count(*) FROM mv_test"), row(1024L));
+    }
 }


[2/8] cassandra git commit: Avoid filtering sstables based on generation when ViewBuilder restarts

Posted by ja...@apache.org.
Avoid filtering sstables based on generation when ViewBuilder restarts

Patch by tjake; reviewed by Paulo Motta for CASSANDRA-13405


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2f1ab4a4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2f1ab4a4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2f1ab4a4

Branch: refs/heads/cassandra-3.11
Commit: 2f1ab4a4248ac24c890e195cd5714ca54510c19a
Parents: 828ca7c
Author: T Jake Luciani <ja...@apache.org>
Authored: Mon Apr 3 13:56:04 2017 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Tue Apr 4 12:45:52 2017 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 src/java/org/apache/cassandra/db/view/View.java |  5 ++
 .../apache/cassandra/db/view/ViewBuilder.java   | 57 ++++++++++---------
 .../org/apache/cassandra/cql3/ViewTest.java     | 59 ++++++++++++++++++++
 4 files changed, 97 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f1ab4a4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c258203..d9a97ad 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.13
+ * Fix view builder bug that can filter out data on restart (CASSANDRA-13405)
  * Fix 2i page size calculation when there are no regular columns (CASSANDRA-13400)
  * Fix the conversion of 2.X expired rows without regular column data (CASSANDRA-13395)
  * Fix hint delivery when using ext+internal IPs with prefer_local enabled (CASSANDRA-13020)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f1ab4a4/src/java/org/apache/cassandra/db/view/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/View.java b/src/java/org/apache/cassandra/db/view/View.java
index 845a6ab..e471349 100644
--- a/src/java/org/apache/cassandra/db/view/View.java
+++ b/src/java/org/apache/cassandra/db/view/View.java
@@ -208,7 +208,11 @@ public class View
     public ReadQuery getReadQuery()
     {
         if (query == null)
+        {
             query = getSelectStatement().getQuery(QueryOptions.forInternalCalls(Collections.emptyList()), FBUtilities.nowInSeconds());
+            logger.trace("View query: {}", rawSelect);
+        }
+
         return query;
     }
 
@@ -216,6 +220,7 @@ public class View
     {
         if (this.builder != null)
         {
+            logger.debug("Stopping current view builder due to schema change");
             this.builder.stop();
             this.builder = null;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f1ab4a4/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index 37c0e7b..94314fd 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 
@@ -74,8 +75,12 @@ public class ViewBuilder extends CompactionInfo.Holder
     {
         AtomicLong noBase = new AtomicLong(Long.MAX_VALUE);
         ReadQuery selectQuery = view.getReadQuery();
+
         if (!selectQuery.selectsKey(key))
+        {
+            logger.trace("Skipping {}, view query filters", key);
             return;
+        }
 
         int nowInSec = FBUtilities.nowInSeconds();
         SinglePartitionReadCommand command = view.getSelectStatement().internalReadForView(key, nowInSec);
@@ -97,49 +102,41 @@ public class ViewBuilder extends CompactionInfo.Holder
 
     public void run()
     {
+        logger.debug("Starting view builder for {}.{}", baseCfs.metadata.ksName, view.name);
         String ksname = baseCfs.metadata.ksName, viewName = view.name;
 
         if (SystemKeyspace.isViewBuilt(ksname, viewName))
+        {
+            logger.debug("View already marked built for {}.{}", baseCfs.metadata.ksName, view.name);
             return;
-
+        }
         Iterable<Range<Token>> ranges = StorageService.instance.getLocalRanges(baseCfs.metadata.ksName);
+
         final Pair<Integer, Token> buildStatus = SystemKeyspace.getViewBuildStatus(ksname, viewName);
         Token lastToken;
         Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>> function;
         if (buildStatus == null)
         {
-            baseCfs.forceBlockingFlush();
-            function = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL);
-            int generation = Integer.MIN_VALUE;
-
-            try (Refs<SSTableReader> temp = baseCfs.selectAndReference(function).refs)
-            {
-                for (SSTableReader reader : temp)
-                {
-                    generation = Math.max(reader.descriptor.generation, generation);
-                }
-            }
-
-            SystemKeyspace.beginViewBuild(ksname, viewName, generation);
+            logger.debug("Starting new view build. flushing base table {}.{}", baseCfs.metadata.ksName, baseCfs.name);
             lastToken = null;
+
+            //We don't track the generation number anymore since if a rebuild is stopped and
+            //restarted the max generation filter may yield no sstables due to compactions.
+            //We only care about max generation *during* a build, not across builds.
+            //see CASSANDRA-13405
+            SystemKeyspace.beginViewBuild(ksname, viewName, 0);
         }
         else
         {
-            function = new Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>>()
-            {
-                @Nullable
-                public Iterable<SSTableReader> apply(org.apache.cassandra.db.lifecycle.View view)
-                {
-                    Iterable<SSTableReader> readers = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL).apply(view);
-                    if (readers != null)
-                        return Iterables.filter(readers, ssTableReader -> ssTableReader.descriptor.generation <= buildStatus.left);
-                    return null;
-                }
-            };
             lastToken = buildStatus.right;
+            logger.debug("Resuming view build from token {}. flushing base table {}.{}", lastToken, baseCfs.metadata.ksName, baseCfs.name);
         }
 
+        baseCfs.forceBlockingFlush();
+        function = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL);
+
         prevToken = lastToken;
+        long keysBuilt = 0;
         try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(function).refs;
              ReducingKeyIterator iter = new ReducingKeyIterator(sstables))
         {
@@ -154,6 +151,7 @@ public class ViewBuilder extends CompactionInfo.Holder
                         if (range.contains(token))
                         {
                             buildKey(key);
+                            ++keysBuilt;
 
                             if (prevToken == null || prevToken.compareTo(token) != 0)
                             {
@@ -162,12 +160,20 @@ public class ViewBuilder extends CompactionInfo.Holder
                             }
                         }
                     }
+
                     lastToken = null;
                 }
             }
 
             if (!isStopped)
+            {
+                logger.debug("Marking view({}.{}) as built covered {} keys ", ksname, viewName, keysBuilt);
                 SystemKeyspace.finishViewBuildStatus(ksname, viewName);
+            }
+            else
+            {
+                logger.debug("Stopped build for view({}.{}) after covering {} keys", ksname, viewName, keysBuilt);
+            }
         }
         catch (Exception e)
         {
@@ -198,6 +204,7 @@ public class ViewBuilder extends CompactionInfo.Holder
             if (lastToken == null || range.contains(lastToken))
                 rangesLeft = 0;
         }
+
         return new CompactionInfo(baseCfs.metadata, OperationType.VIEW_BUILD, rangesLeft, rangesTotal, "ranges", compactionId);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f1ab4a4/test/unit/org/apache/cassandra/cql3/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/ViewTest.java b/test/unit/org/apache/cassandra/cql3/ViewTest.java
index 2070bef..e595ebd 100644
--- a/test/unit/org/apache/cassandra/cql3/ViewTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java
@@ -21,8 +21,11 @@ package org.apache.cassandra.cql3;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.Uninterruptibles;
+
 import junit.framework.Assert;
 import org.junit.After;
 import org.junit.Before;
@@ -40,6 +43,7 @@ import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.junit.Assert.assertTrue;
@@ -1203,4 +1207,59 @@ public class ViewTest extends CQLTester
         assertRowsNet(protocolVersion, executeNet(protocolVersion, "SELECT * FROM %s"), row(0, 1));
         assertRowsNet(protocolVersion, executeNet(protocolVersion, "SELECT * FROM mv"), row(1, 0));
     }
+
+    @Test
+    public void testViewBuilderResume() throws Throwable
+    {
+        createTable("CREATE TABLE %s (" +
+                    "k int, " +
+                    "c int, " +
+                    "val text, " +
+                    "PRIMARY KEY(k,c))");
+
+        execute("USE " + keyspace());
+        executeNet(protocolVersion, "USE " + keyspace());
+
+        CompactionManager.instance.setCoreCompactorThreads(1);
+        CompactionManager.instance.setMaximumCompactorThreads(1);
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+        cfs.disableAutoCompaction();
+
+        for (int i = 0; i < 1024; i++)
+            execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+        cfs.forceBlockingFlush();
+
+        for (int i = 0; i < 1024; i++)
+            execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+        cfs.forceBlockingFlush();
+
+        for (int i = 0; i < 1024; i++)
+            execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+        cfs.forceBlockingFlush();
+
+        for (int i = 0; i < 1024; i++)
+            execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+        cfs.forceBlockingFlush();
+
+        createView("mv_test", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (val,k,c)");
+
+        cfs.enableAutoCompaction();
+        List<Future<?>> futures = CompactionManager.instance.submitBackground(cfs);
+
+        //Force a second MV on the same base table, which will restart the first MV builder...
+        createView("mv_test2", "CREATE MATERIALIZED VIEW %s AS SELECT val, k, c FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (val,k,c)");
+
+
+        //Compact the base table
+        FBUtilities.waitOnFutures(futures);
+
+        while (!SystemKeyspace.isViewBuilt(keyspace(), "mv_test"))
+            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+
+        assertRows(execute("SELECT count(*) FROM mv_test"), row(1024L));
+    }
 }


[4/8] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by ja...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/449400be
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/449400be
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/449400be

Branch: refs/heads/cassandra-3.11
Commit: 449400be30a591a69499f2f81dedefdbe0e14785
Parents: e8053dd 2f1ab4a
Author: T Jake Luciani <ja...@apache.org>
Authored: Tue Apr 4 12:46:12 2017 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Tue Apr 4 12:46:12 2017 -0400

----------------------------------------------------------------------

----------------------------------------------------------------------



[8/8] cassandra git commit: Merge branch 'cassandra-3.11' into trunk

Posted by ja...@apache.org.
Merge branch 'cassandra-3.11' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/522ddba2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/522ddba2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/522ddba2

Branch: refs/heads/trunk
Commit: 522ddba275926761bf2870b67664cf318401f158
Parents: 633babf be96c28
Author: T Jake Luciani <ja...@apache.org>
Authored: Tue Apr 4 12:48:01 2017 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Tue Apr 4 12:48:01 2017 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |  5 +-
 src/java/org/apache/cassandra/db/view/View.java |  5 ++
 .../apache/cassandra/db/view/ViewBuilder.java   | 51 +++++++++--------
 .../org/apache/cassandra/cql3/ViewTest.java     | 59 ++++++++++++++++++++
 4 files changed, 95 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/522ddba2/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 3147eb9,1ca8733..ffe958d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -70,7 -19,11 +70,10 @@@
   * NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983)
   * Address message coalescing regression (CASSANDRA-12676)
  Merged from 3.0:
+  * Fix view builder bug that can filter out data on restart (CASSANDRA-13405)
+  * Fix 2i page size calculation when there are no regular columns (CASSANDRA-13400)
+  * Fix the conversion of 2.X expired rows without regular column data (CASSANDRA-13395)
   * Fix hint delivery when using ext+internal IPs with prefer_local enabled (CASSANDRA-13020)
 - * Fix possible NPE on upgrade to 3.0/3.X in case of IO errors (CASSANDRA-13389)
   * Legacy deserializer can create empty range tombstones (CASSANDRA-13341)
   * Legacy caching options can prevent 3.0 upgrade (CASSANDRA-13384)
   * Use the Kernel32 library to retrieve the PID on Windows and fix startup checks (CASSANDRA-13333)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/522ddba2/src/java/org/apache/cassandra/db/view/View.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/522ddba2/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/view/ViewBuilder.java
index 6250ae7,8e647ea..af86392
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@@ -96,9 -101,10 +101,10 @@@ public class ViewBuilder extends Compac
  
      public void run()
      {
+         logger.debug("Starting view builder for {}.{}", baseCfs.metadata.ksName, view.name);
 -        logger.trace("Running view builder for {}.{}", baseCfs.metadata.ksName, view.name);
 +        logger.trace("Running view builder for {}.{}", baseCfs.metadata.keyspace, view.name);
          UUID localHostId = SystemKeyspace.getLocalHostId();
 -        String ksname = baseCfs.metadata.ksName, viewName = view.name;
 +        String ksname = baseCfs.metadata.keyspace, viewName = view.name;
  
          if (SystemKeyspace.isViewBuilt(ksname, viewName))
          {
@@@ -222,7 -225,8 +224,8 @@@
              if (lastToken == null || range.contains(lastToken))
                  rangesLeft = 0;
          }
+ 
 -        return new CompactionInfo(baseCfs.metadata, OperationType.VIEW_BUILD, rangesLeft, rangesTotal, "ranges", compactionId);
 +        return new CompactionInfo(baseCfs.metadata(), OperationType.VIEW_BUILD, rangesLeft, rangesTotal, "ranges", compactionId);
      }
  
      public void stop()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/522ddba2/test/unit/org/apache/cassandra/cql3/ViewTest.java
----------------------------------------------------------------------


[7/8] cassandra git commit: Avoid filtering sstables based on generation when ViewBuilder restarts

Posted by ja...@apache.org.
Avoid filtering sstables based on generation when ViewBuilder restarts

Patch by tjake; reviewed by Paulo Motta for CASSANDRA-13405


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/be96c284
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/be96c284
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/be96c284

Branch: refs/heads/cassandra-3.11
Commit: be96c2840b0a6b269e22bde246b84e8ef4aeef69
Parents: 449400b
Author: T Jake Luciani <ja...@apache.org>
Authored: Mon Apr 3 13:56:04 2017 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Tue Apr 4 12:46:42 2017 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |  4 +-
 src/java/org/apache/cassandra/db/view/View.java |  5 ++
 .../apache/cassandra/db/view/ViewBuilder.java   | 52 +++++++++--------
 .../org/apache/cassandra/cql3/ViewTest.java     | 59 ++++++++++++++++++++
 4 files changed, 95 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/be96c284/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e63e266..1ca8733 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,7 +1,7 @@
 3.11.0
  * Fix testLimitSSTables flake caused by concurrent flush (CASSANDRA-12820)
  * cdc column addition strikes again (CASSANDRA-13382)
- * Fix static column indexes (CASSANDRA-13277) 
+ * Fix static column indexes (CASSANDRA-13277)
  * DataOutputBuffer.asNewBuffer broken (CASSANDRA-13298)
  * unittest CipherFactoryTest failed on MacOS (CASSANDRA-13370)
  * Forbid SELECT restrictions and CREATE INDEX over non-frozen UDT columns (CASSANDRA-13247)
@@ -19,6 +19,8 @@
  * NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983)
  * Address message coalescing regression (CASSANDRA-12676)
 Merged from 3.0:
+ * Fix view builder bug that can filter out data on restart (CASSANDRA-13405)
+ * Fix 2i page size calculation when there are no regular columns (CASSANDRA-13400)
  * Fix the conversion of 2.X expired rows without regular column data (CASSANDRA-13395)
  * Fix hint delivery when using ext+internal IPs with prefer_local enabled (CASSANDRA-13020)
  * Fix possible NPE on upgrade to 3.0/3.X in case of IO errors (CASSANDRA-13389)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/be96c284/src/java/org/apache/cassandra/db/view/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/View.java b/src/java/org/apache/cassandra/db/view/View.java
index 0b8de9e..e4c6e02 100644
--- a/src/java/org/apache/cassandra/db/view/View.java
+++ b/src/java/org/apache/cassandra/db/view/View.java
@@ -197,7 +197,11 @@ public class View
     public ReadQuery getReadQuery()
     {
         if (query == null)
+        {
             query = getSelectStatement().getQuery(QueryOptions.forInternalCalls(Collections.emptyList()), FBUtilities.nowInSeconds());
+            logger.trace("View query: {}", rawSelect);
+        }
+
         return query;
     }
 
@@ -205,6 +209,7 @@ public class View
     {
         if (this.builder != null)
         {
+            logger.debug("Stopping current view builder due to schema change");
             this.builder.stop();
             this.builder = null;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/be96c284/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index 9550e1e..8e647ea 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 
@@ -72,8 +73,12 @@ public class ViewBuilder extends CompactionInfo.Holder
     private void buildKey(DecoratedKey key)
     {
         ReadQuery selectQuery = view.getReadQuery();
+
         if (!selectQuery.selectsKey(key))
+        {
+            logger.trace("Skipping {}, view query filters", key);
             return;
+        }
 
         int nowInSec = FBUtilities.nowInSeconds();
         SinglePartitionReadCommand command = view.getSelectStatement().internalReadForView(key, nowInSec);
@@ -96,55 +101,46 @@ public class ViewBuilder extends CompactionInfo.Holder
 
     public void run()
     {
+        logger.debug("Starting view builder for {}.{}", baseCfs.metadata.ksName, view.name);
         logger.trace("Running view builder for {}.{}", baseCfs.metadata.ksName, view.name);
         UUID localHostId = SystemKeyspace.getLocalHostId();
         String ksname = baseCfs.metadata.ksName, viewName = view.name;
 
         if (SystemKeyspace.isViewBuilt(ksname, viewName))
         {
+            logger.debug("View already marked built for {}.{}", baseCfs.metadata.ksName, view.name);
             if (!SystemKeyspace.isViewStatusReplicated(ksname, viewName))
                 updateDistributed(ksname, viewName, localHostId);
             return;
         }
 
         Iterable<Range<Token>> ranges = StorageService.instance.getLocalRanges(baseCfs.metadata.ksName);
+
         final Pair<Integer, Token> buildStatus = SystemKeyspace.getViewBuildStatus(ksname, viewName);
         Token lastToken;
         Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>> function;
         if (buildStatus == null)
         {
-            baseCfs.forceBlockingFlush();
-            function = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL);
-            int generation = Integer.MIN_VALUE;
-
-            try (Refs<SSTableReader> temp = baseCfs.selectAndReference(function).refs)
-            {
-                for (SSTableReader reader : temp)
-                {
-                    generation = Math.max(reader.descriptor.generation, generation);
-                }
-            }
-
-            SystemKeyspace.beginViewBuild(ksname, viewName, generation);
+            logger.debug("Starting new view build. flushing base table {}.{}", baseCfs.metadata.ksName, baseCfs.name);
             lastToken = null;
+
+            //We don't track the generation number anymore since if a rebuild is stopped and
+            //restarted the max generation filter may yield no sstables due to compactions.
+            //We only care about max generation *during* a build, not across builds.
+            //see CASSANDRA-13405
+            SystemKeyspace.beginViewBuild(ksname, viewName, 0);
         }
         else
         {
-            function = new Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>>()
-            {
-                @Nullable
-                public Iterable<SSTableReader> apply(org.apache.cassandra.db.lifecycle.View view)
-                {
-                    Iterable<SSTableReader> readers = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL).apply(view);
-                    if (readers != null)
-                        return Iterables.filter(readers, ssTableReader -> ssTableReader.descriptor.generation <= buildStatus.left);
-                    return null;
-                }
-            };
             lastToken = buildStatus.right;
+            logger.debug("Resuming view build from token {}. flushing base table {}.{}", lastToken, baseCfs.metadata.ksName, baseCfs.name);
         }
 
+        baseCfs.forceBlockingFlush();
+        function = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL);
+
         prevToken = lastToken;
+        long keysBuilt = 0;
         try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(function).refs;
              ReducingKeyIterator iter = new ReducingKeyIterator(sstables))
         {
@@ -160,6 +156,7 @@ public class ViewBuilder extends CompactionInfo.Holder
                         if (range.contains(token))
                         {
                             buildKey(key);
+                            ++keysBuilt;
 
                             if (prevToken == null || prevToken.compareTo(token) != 0)
                             {
@@ -168,15 +165,21 @@ public class ViewBuilder extends CompactionInfo.Holder
                             }
                         }
                     }
+
                     lastToken = null;
                 }
             }
 
             if (!isStopped)
             {
+                logger.debug("Marking view({}.{}) as built covered {} keys ", ksname, viewName, keysBuilt);
                 SystemKeyspace.finishViewBuildStatus(ksname, viewName);
                 updateDistributed(ksname, viewName, localHostId);
             }
+            else
+            {
+                logger.debug("Stopped build for view({}.{}) after covering {} keys", ksname, viewName, keysBuilt);
+            }
         }
         catch (Exception e)
         {
@@ -222,6 +225,7 @@ public class ViewBuilder extends CompactionInfo.Holder
             if (lastToken == null || range.contains(lastToken))
                 rangesLeft = 0;
         }
+
         return new CompactionInfo(baseCfs.metadata, OperationType.VIEW_BUILD, rangesLeft, rangesTotal, "ranges", compactionId);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/be96c284/test/unit/org/apache/cassandra/cql3/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/ViewTest.java b/test/unit/org/apache/cassandra/cql3/ViewTest.java
index ac065e6..6f6e04d 100644
--- a/test/unit/org/apache/cassandra/cql3/ViewTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java
@@ -21,8 +21,11 @@ package org.apache.cassandra.cql3;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.Uninterruptibles;
+
 import junit.framework.Assert;
 import org.junit.After;
 import org.junit.Before;
@@ -40,6 +43,7 @@ import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -1234,4 +1238,59 @@ public class ViewTest extends CQLTester
         {
         }
     }
+
+    @Test
+    public void testViewBuilderResume() throws Throwable
+    {
+        createTable("CREATE TABLE %s (" +
+                    "k int, " +
+                    "c int, " +
+                    "val text, " +
+                    "PRIMARY KEY(k,c))");
+
+        execute("USE " + keyspace());
+        executeNet(protocolVersion, "USE " + keyspace());
+
+        CompactionManager.instance.setCoreCompactorThreads(1);
+        CompactionManager.instance.setMaximumCompactorThreads(1);
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+        cfs.disableAutoCompaction();
+
+        for (int i = 0; i < 1024; i++)
+            execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+        cfs.forceBlockingFlush();
+
+        for (int i = 0; i < 1024; i++)
+            execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+        cfs.forceBlockingFlush();
+
+        for (int i = 0; i < 1024; i++)
+            execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+        cfs.forceBlockingFlush();
+
+        for (int i = 0; i < 1024; i++)
+            execute("INSERT into %s (k,c,val)VALUES(?,?,?)", i, i, ""+i);
+
+        cfs.forceBlockingFlush();
+
+        createView("mv_test", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (val,k,c)");
+
+        cfs.enableAutoCompaction();
+        List<Future<?>> futures = CompactionManager.instance.submitBackground(cfs);
+
+        //Force a second MV on the same base table, which will restart the first MV builder...
+        createView("mv_test2", "CREATE MATERIALIZED VIEW %s AS SELECT val, k, c FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (val,k,c)");
+
+
+        //Compact the base table
+        FBUtilities.waitOnFutures(futures);
+
+        while (!SystemKeyspace.isViewBuilt(keyspace(), "mv_test"))
+            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+
+        assertRows(execute("SELECT count(*) FROM mv_test"), row(1024L));
+    }
 }