You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2016/07/27 23:32:58 UTC

cassandra git commit: Support Restricting non-PK Cols in MV Select Statements

Repository: cassandra
Updated Branches:
  refs/heads/trunk 503148b3d -> 5051c0f6e


Support Restricting non-PK Cols in MV Select Statements

Patch by Jochen Niebuhr; reviewed by Tyler Hobbs for CASSANDRA-10368


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5051c0f6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5051c0f6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5051c0f6

Branch: refs/heads/trunk
Commit: 5051c0f6eb3f984600600c9577d6b5ece9038c74
Parents: 503148b
Author: Jochen Niebuhr <jn...@cfire.de>
Authored: Tue Jul 26 13:09:35 2016 -0500
Committer: Tyler Hobbs <ty...@gmail.com>
Committed: Wed Jul 27 18:32:41 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../cql3/statements/CreateViewStatement.java    |   8 -
 .../cassandra/cql3/ViewFilteringTest.java       | 320 +++++++++++++++++++
 3 files changed, 322 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5051c0f6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 04b2a3f..7ad965e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 3.10
+ * Support filtering on non-PRIMARY KEY columns in the CREATE
+   MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
  * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
  * COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
  * Faster write path (CASSANDRA-12269)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5051c0f6/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
index 5e14e72..8fe9f7e 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
@@ -203,14 +203,6 @@ public class CreateViewStatement extends SchemaAlteringStatement
         if (!prepared.boundNames.isEmpty())
             throw new InvalidRequestException("Cannot use query parameters in CREATE MATERIALIZED VIEW statements");
 
-        if (!restrictions.nonPKRestrictedColumns(false).isEmpty())
-        {
-            throw new InvalidRequestException(String.format(
-                    "Non-primary key columns cannot be restricted in the SELECT statement used for materialized view " +
-                    "creation (got restrictions on: %s)",
-                    restrictions.nonPKRestrictedColumns(false).stream().map(def -> def.name.toString()).collect(Collectors.joining(", "))));
-        }
-
         String whereClauseText = View.relationsToWhereClause(whereClause.relations);
 
         Set<ColumnIdentifier> basePrimaryKeyCols = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5051c0f6/test/unit/org/apache/cassandra/cql3/ViewFilteringTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/ViewFilteringTest.java b/test/unit/org/apache/cassandra/cql3/ViewFilteringTest.java
index 245ceb7..12cb673 100644
--- a/test/unit/org/apache/cassandra/cql3/ViewFilteringTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ViewFilteringTest.java
@@ -28,7 +28,15 @@ import org.junit.Test;
 import com.datastax.driver.core.exceptions.InvalidQueryException;
 import junit.framework.Assert;
 
+import org.apache.cassandra.concurrent.SEPExecutor;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class ViewFilteringTest extends CQLTester
 {
@@ -61,6 +69,16 @@ public class ViewFilteringTest extends CQLTester
         views.add(name);
     }
 
+    private void updateView(String query, Object... params) throws Throwable
+    {
+        executeNet(protocolVersion, query, params);
+        while (!(((SEPExecutor) StageManager.getStage(Stage.VIEW_MUTATION)).getPendingTasks() == 0
+            && ((SEPExecutor) StageManager.getStage(Stage.VIEW_MUTATION)).getActiveCount() == 0))
+        {
+            Thread.sleep(1);
+        }
+    }
+
     private void dropView(String name) throws Throwable
     {
         executeNet(protocolVersion, "DROP MATERIALIZED VIEW " + name);
@@ -1303,4 +1321,306 @@ public class ViewFilteringTest extends CQLTester
         executeNet(protocolVersion, "ALTER TABLE %s RENAME inetval TO foo");
         assert !execute("SELECT * FROM mv_test").isEmpty();
     }
+
+    @Test
+    public void testMVCreationWithNonPrimaryRestrictions() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b))");
+
+        execute("USE " + keyspace());
+        executeNet(protocolVersion, "USE " + keyspace());
+
+        try {
+            createView("mv_test", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE b IS NOT NULL AND c IS NOT NULL AND d = 1 PRIMARY KEY (a, b, c)");
+            dropView("mv_test");
+        } catch(Exception e) {
+            throw new RuntimeException("MV creation with non primary column restrictions failed.", e);
+        }
+
+        dropTable("DROP TABLE %s");
+    }
+
+    @Test
+    public void testNonPrimaryRestrictions() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b))");
+
+        execute("USE " + keyspace());
+        executeNet(protocolVersion, "USE " + keyspace());
+
+        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 0, 0);
+        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 1, 0);
+        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 1, 0, 0);
+        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 1, 1, 0);
+        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 1, 0, 0, 0);
+        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 1, 0, 1, 0);
+        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 1, 1, 0, 0);
+        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 1, 1, 1, 0);
+
+        // only accept rows where c = 1
+        createView("mv_test", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE a IS NOT NULL AND b IS NOT NULL AND c IS NOT NULL AND c = 1 PRIMARY KEY (a, b, c)");
+
+        while (!SystemKeyspace.isViewBuilt(keyspace(), "mv_test"))
+            Thread.sleep(10);
+
+        assertRowsIgnoringOrder(execute("SELECT a, b, c, d FROM mv_test"),
+            row(0, 0, 1, 0),
+            row(0, 1, 1, 0),
+            row(1, 0, 1, 0),
+            row(1, 1, 1, 0)
+        );
+
+        // insert new rows that do not match the filter
+        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 2, 0, 0, 0);
+        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 2, 1, 2, 0);
+        assertRowsIgnoringOrder(execute("SELECT a, b, c, d FROM mv_test"),
+            row(0, 0, 1, 0),
+            row(0, 1, 1, 0),
+            row(1, 0, 1, 0),
+            row(1, 1, 1, 0)
+        );
+
+        // insert new row that does match the filter
+        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 1, 2, 1, 0);
+        assertRowsIgnoringOrder(execute("SELECT a, b, c, d FROM mv_test"),
+            row(0, 0, 1, 0),
+            row(0, 1, 1, 0),
+            row(1, 0, 1, 0),
+            row(1, 1, 1, 0),
+            row(1, 2, 1, 0)
+        );
+
+        // update rows that don't match the filter
+        execute("UPDATE %s SET d = ? WHERE a = ? AND b = ?", 2, 2, 0);
+        execute("UPDATE %s SET d = ? WHERE a = ? AND b = ?", 1, 2, 1);
+        assertRowsIgnoringOrder(execute("SELECT a, b, c, d FROM mv_test"),
+            row(0, 0, 1, 0),
+            row(0, 1, 1, 0),
+            row(1, 0, 1, 0),
+            row(1, 1, 1, 0),
+            row(1, 2, 1, 0)
+        );
+
+        // update a row that does match the filter
+        execute("UPDATE %s SET d = ? WHERE a = ? AND b = ?", 1, 1, 0);
+        assertRowsIgnoringOrder(execute("SELECT a, b, c, d FROM mv_test"),
+            row(0, 0, 1, 0),
+            row(0, 1, 1, 0),
+            row(1, 0, 1, 1),
+            row(1, 1, 1, 0),
+            row(1, 2, 1, 0)
+        );
+
+        // delete rows that don't match the filter
+        execute("DELETE FROM %s WHERE a = ? AND b = ?", 2, 0);
+        assertRowsIgnoringOrder(execute("SELECT a, b, c, d FROM mv_test"),
+            row(0, 0, 1, 0),
+            row(0, 1, 1, 0),
+            row(1, 0, 1, 1),
+            row(1, 1, 1, 0),
+            row(1, 2, 1, 0)
+        );
+
+        // delete a row that does match the filter
+        execute("DELETE FROM %s WHERE a = ? AND b = ?", 1, 2);
+        assertRowsIgnoringOrder(execute("SELECT a, b, c, d FROM mv_test"),
+            row(0, 0, 1, 0),
+            row(0, 1, 1, 0),
+            row(1, 0, 1, 1),
+            row(1, 1, 1, 0)
+        );
+
+        // delete a partition that matches the filter
+        execute("DELETE FROM %s WHERE a = ?", 1);
+        assertRowsIgnoringOrder(execute("SELECT a, b, c, d FROM mv_test"),
+            row(0, 0, 1, 0),
+            row(0, 1, 1, 0)
+        );
+
+        dropView("mv_test");
+        dropTable("DROP TABLE %s");
+    }
+
+    @Test
+    public void complexRestrictedTimestampUpdateTestWithFlush() throws Throwable
+    {
+        complexRestrictedTimestampUpdateTest(true);
+    }
+
+    @Test
+    public void complexRestrictedTimestampUpdateTestWithoutFlush() throws Throwable
+    {
+        complexRestrictedTimestampUpdateTest(false);
+    }
+
+    public void complexRestrictedTimestampUpdateTest(boolean flush) throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int, b int, c int, d int, e int, PRIMARY KEY (a, b))");
+
+        execute("USE " + keyspace());
+        executeNet(protocolVersion, "USE " + keyspace());
+        Keyspace ks = Keyspace.open(keyspace());
+
+        createView("mv", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE a IS NOT NULL AND b IS NOT NULL AND c IS NOT NULL AND c = 1 PRIMARY KEY (c, a, b)");
+        ks.getColumnFamilyStore("mv").disableAutoCompaction();
+
+        //Set initial values TS=0, matching the restriction and verify view
+        executeNet(protocolVersion, "INSERT INTO %s (a, b, c, d) VALUES (0, 0, 1, 0) USING TIMESTAMP 0");
+        assertRows(execute("SELECT d from mv WHERE c = ? and a = ? and b = ?", 1, 0, 0), row(0));
+
+        if (flush)
+            FBUtilities.waitOnFutures(ks.flush());
+
+        //update c's timestamp TS=2
+        executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 2 SET c = ? WHERE a = ? and b = ? ", 1, 0, 0);
+        assertRows(execute("SELECT d from mv WHERE c = ? and a = ? and b = ?", 1, 0, 0), row(0));
+
+        if (flush)
+            FBUtilities.waitOnFutures(ks.flush());
+
+        //change c's value and TS=3, tombstones c=1 and adds c=0 record
+        executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 3 SET c = ? WHERE a = ? and b = ? ", 0, 0, 0);
+        assertRows(execute("SELECT d from mv WHERE c = ? and a = ? and b = ?", 0, 0, 0));
+
+        if(flush)
+        {
+            ks.getColumnFamilyStore("mv").forceMajorCompaction();
+            FBUtilities.waitOnFutures(ks.flush());
+        }
+
+        //change c's value back to 1 with TS=4, check we can see d
+        executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 4 SET c = ? WHERE a = ? and b = ? ", 1, 0, 0);
+        if (flush)
+        {
+            ks.getColumnFamilyStore("mv").forceMajorCompaction();
+            FBUtilities.waitOnFutures(ks.flush());
+        }
+
+        assertRows(execute("SELECT d, e from mv WHERE c = ? and a = ? and b = ?", 1, 0, 0), row(0, null));
+
+
+        //Add e value @ TS=1
+        executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 1 SET e = ? WHERE a = ? and b = ? ", 1, 0, 0);
+        assertRows(execute("SELECT d, e from mv WHERE c = ? and a = ? and b = ?", 1, 0, 0), row(0, 1));
+
+        if (flush)
+            FBUtilities.waitOnFutures(ks.flush());
+
+
+        //Change d value @ TS=2
+        executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 2 SET d = ? WHERE a = ? and b = ? ", 2, 0, 0);
+        assertRows(execute("SELECT d from mv WHERE c = ? and a = ? and b = ?", 1, 0, 0), row(2));
+
+        if (flush)
+            FBUtilities.waitOnFutures(ks.flush());
+
+
+        //Change d value @ TS=3
+        executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 3 SET d = ? WHERE a = ? and b = ? ", 1, 0, 0);
+        assertRows(execute("SELECT d from mv WHERE c = ? and a = ? and b = ?", 1, 0, 0), row(1));
+
+
+        //Tombstone c
+        executeNet(protocolVersion, "DELETE FROM %s WHERE a = ? and b = ?", 0, 0);
+        assertRows(execute("SELECT d from mv"));
+
+        //Add back without D
+        executeNet(protocolVersion, "INSERT INTO %s (a, b, c) VALUES (0, 0, 1)");
+
+        //Make sure D doesn't pop back in.
+        assertRows(execute("SELECT d from mv WHERE c = ? and a = ? and b = ?", 1, 0, 0), row((Object) null));
+
+
+        //New partition
+        // insert a row with timestamp 0
+        executeNet(protocolVersion, "INSERT INTO %s (a, b, c, d, e) VALUES (?, ?, ?, ?, ?) USING TIMESTAMP 0", 1, 0, 1, 0, 0);
+
+        // overwrite pk and e with timestamp 1, but don't overwrite d
+        executeNet(protocolVersion, "INSERT INTO %s (a, b, c, e) VALUES (?, ?, ?, ?) USING TIMESTAMP 1", 1, 0, 1, 0);
+
+        // delete with timestamp 0 (which should only delete d)
+        executeNet(protocolVersion, "DELETE FROM %s USING TIMESTAMP 0 WHERE a = ? AND b = ?", 1, 0);
+        assertRows(execute("SELECT a, b, c, d, e from mv WHERE c = ? and a = ? and b = ?", 1, 1, 0),
+            row(1, 0, 1, null, 0)
+        );
+
+        executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 2 SET c = ? WHERE a = ? AND b = ?", 1, 1, 1);
+        executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 3 SET c = ? WHERE a = ? AND b = ?", 1, 1, 0);
+        assertRows(execute("SELECT a, b, c, d, e from mv WHERE c = ? and a = ? and b = ?", 1, 1, 0),
+            row(1, 0, 1, null, 0)
+        );
+
+        executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 3 SET d = ? WHERE a = ? AND b = ?", 0, 1, 0);
+        assertRows(execute("SELECT a, b, c, d, e from mv WHERE c = ? and a = ? and b = ?", 1, 1, 0),
+            row(1, 0, 1, 0, 0)
+        );
+    }
+
+    @Test
+    public void testRestrictedRegularColumnTimestampUpdates() throws Throwable
+    {
+        // Regression test for CASSANDRA-10910
+
+        createTable("CREATE TABLE %s (" +
+            "k int PRIMARY KEY, " +
+            "c int, " +
+            "val int)");
+
+        execute("USE " + keyspace());
+        executeNet(protocolVersion, "USE " + keyspace());
+
+        createView("mv_rctstest", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE k IS NOT NULL AND c IS NOT NULL AND c = 1 PRIMARY KEY (k,c)");
+
+        updateView("UPDATE %s SET c = ?, val = ? WHERE k = ?", 0, 0, 0);
+        updateView("UPDATE %s SET val = ? WHERE k = ?", 1, 0);
+        updateView("UPDATE %s SET c = ? WHERE k = ?", 1, 0);
+        assertRows(execute("SELECT c, k, val FROM mv_rctstest"), row(1, 0, 1));
+
+        updateView("TRUNCATE %s");
+
+        updateView("UPDATE %s USING TIMESTAMP 1 SET c = ?, val = ? WHERE k = ?", 0, 0, 0);
+        updateView("UPDATE %s USING TIMESTAMP 3 SET c = ? WHERE k = ?", 1, 0);
+        updateView("UPDATE %s USING TIMESTAMP 2 SET val = ? WHERE k = ?", 1, 0);
+        updateView("UPDATE %s USING TIMESTAMP 4 SET c = ? WHERE k = ?", 1, 0);
+        updateView("UPDATE %s USING TIMESTAMP 3 SET val = ? WHERE k = ?", 2, 0);
+        assertRows(execute("SELECT c, k, val FROM mv_rctstest"), row(1, 0, 2));
+    }
+
+    @Test
+    public void testOldTimestampsWithRestrictions() throws Throwable
+    {
+        createTable("CREATE TABLE %s (" +
+            "k int, " +
+            "c int, " +
+            "val text, " + "" +
+            "PRIMARY KEY(k, c))");
+
+        execute("USE " + keyspace());
+        executeNet(protocolVersion, "USE " + keyspace());
+
+        createView("mv_tstest", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL AND val = 'baz' PRIMARY KEY (val,k,c)");
+
+        for (int i = 0; i < 100; i++)
+            updateView("INSERT into %s (k,c,val)VALUES(?,?,?)", 0, i % 2, "baz");
+
+        Keyspace.open(keyspace()).getColumnFamilyStore(currentTable()).forceBlockingFlush();
+
+        Assert.assertEquals(2, execute("select * from %s").size());
+        Assert.assertEquals(2, execute("select * from mv_tstest").size());
+
+        assertRows(execute("SELECT val from %s where k = 0 and c = 0"), row("baz"));
+        assertRows(execute("SELECT c from mv_tstest where k = 0 and val = ?", "baz"), row(0), row(1));
+
+        //Make sure an old TS does nothing
+        updateView("UPDATE %s USING TIMESTAMP 100 SET val = ? where k = ? AND c = ?", "bar", 0, 1);
+        assertRows(execute("SELECT val from %s where k = 0 and c = 1"), row("baz"));
+        assertRows(execute("SELECT c from mv_tstest where k = 0 and val = ?", "baz"), row(0), row(1));
+        assertRows(execute("SELECT c from mv_tstest where k = 0 and val = ?", "bar"));
+
+        //Latest TS
+        updateView("UPDATE %s SET val = ? where k = ? AND c = ?", "bar", 0, 1);
+        assertRows(execute("SELECT val from %s where k = 0 and c = 1"), row("bar"));
+        assertRows(execute("SELECT c from mv_tstest where k = 0 and val = ?", "bar"));
+        assertRows(execute("SELECT c from mv_tstest where k = 0 and val = ?", "baz"), row(0));
+    }
 }