You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2016/07/27 23:32:58 UTC
cassandra git commit: Support Restricting non-PK Cols in MV Select
Statements
Repository: cassandra
Updated Branches:
refs/heads/trunk 503148b3d -> 5051c0f6e
Support Restricting non-PK Cols in MV Select Statements
Patch by Jochen Niebuhr; reviewed by Tyler Hobbs for CASSANDRA-10368
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5051c0f6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5051c0f6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5051c0f6
Branch: refs/heads/trunk
Commit: 5051c0f6eb3f984600600c9577d6b5ece9038c74
Parents: 503148b
Author: Jochen Niebuhr <jn...@cfire.de>
Authored: Tue Jul 26 13:09:35 2016 -0500
Committer: Tyler Hobbs <ty...@gmail.com>
Committed: Wed Jul 27 18:32:41 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../cql3/statements/CreateViewStatement.java | 8 -
.../cassandra/cql3/ViewFilteringTest.java | 320 +++++++++++++++++++
3 files changed, 322 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5051c0f6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 04b2a3f..7ad965e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
3.10
+ * Support filtering on non-PRIMARY KEY columns in the CREATE
+ MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
* Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
* COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
* Faster write path (CASSANDRA-12269)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5051c0f6/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
index 5e14e72..8fe9f7e 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
@@ -203,14 +203,6 @@ public class CreateViewStatement extends SchemaAlteringStatement
if (!prepared.boundNames.isEmpty())
throw new InvalidRequestException("Cannot use query parameters in CREATE MATERIALIZED VIEW statements");
- if (!restrictions.nonPKRestrictedColumns(false).isEmpty())
- {
- throw new InvalidRequestException(String.format(
- "Non-primary key columns cannot be restricted in the SELECT statement used for materialized view " +
- "creation (got restrictions on: %s)",
- restrictions.nonPKRestrictedColumns(false).stream().map(def -> def.name.toString()).collect(Collectors.joining(", "))));
- }
-
String whereClauseText = View.relationsToWhereClause(whereClause.relations);
Set<ColumnIdentifier> basePrimaryKeyCols = new HashSet<>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5051c0f6/test/unit/org/apache/cassandra/cql3/ViewFilteringTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/ViewFilteringTest.java b/test/unit/org/apache/cassandra/cql3/ViewFilteringTest.java
index 245ceb7..12cb673 100644
--- a/test/unit/org/apache/cassandra/cql3/ViewFilteringTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ViewFilteringTest.java
@@ -28,7 +28,15 @@ import org.junit.Test;
import com.datastax.driver.core.exceptions.InvalidQueryException;
import junit.framework.Assert;
+import org.apache.cassandra.concurrent.SEPExecutor;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.utils.FBUtilities;
public class ViewFilteringTest extends CQLTester
{
@@ -61,6 +69,16 @@ public class ViewFilteringTest extends CQLTester
views.add(name);
}
+ private void updateView(String query, Object... params) throws Throwable
+ {
+ executeNet(protocolVersion, query, params);
+ while (!(((SEPExecutor) StageManager.getStage(Stage.VIEW_MUTATION)).getPendingTasks() == 0
+ && ((SEPExecutor) StageManager.getStage(Stage.VIEW_MUTATION)).getActiveCount() == 0))
+ {
+ Thread.sleep(1);
+ }
+ }
+
private void dropView(String name) throws Throwable
{
executeNet(protocolVersion, "DROP MATERIALIZED VIEW " + name);
@@ -1303,4 +1321,306 @@ public class ViewFilteringTest extends CQLTester
executeNet(protocolVersion, "ALTER TABLE %s RENAME inetval TO foo");
assert !execute("SELECT * FROM mv_test").isEmpty();
}
+
+ @Test
+ public void testMVCreationWithNonPrimaryRestrictions() throws Throwable
+ {
+ createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b))");
+
+ execute("USE " + keyspace());
+ executeNet(protocolVersion, "USE " + keyspace());
+
+ try {
+ createView("mv_test", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE b IS NOT NULL AND c IS NOT NULL AND d = 1 PRIMARY KEY (a, b, c)");
+ dropView("mv_test");
+ } catch(Exception e) {
+ throw new RuntimeException("MV creation with non primary column restrictions failed.", e);
+ }
+
+ dropTable("DROP TABLE %s");
+ }
+
+ @Test
+ public void testNonPrimaryRestrictions() throws Throwable
+ {
+ createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b))");
+
+ execute("USE " + keyspace());
+ executeNet(protocolVersion, "USE " + keyspace());
+
+ execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 0, 0);
+ execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 1, 0);
+ execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 1, 0, 0);
+ execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 1, 1, 0);
+ execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 1, 0, 0, 0);
+ execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 1, 0, 1, 0);
+ execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 1, 1, 0, 0);
+ execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 1, 1, 1, 0);
+
+ // only accept rows where c = 1
+ createView("mv_test", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE a IS NOT NULL AND b IS NOT NULL AND c IS NOT NULL AND c = 1 PRIMARY KEY (a, b, c)");
+
+ while (!SystemKeyspace.isViewBuilt(keyspace(), "mv_test"))
+ Thread.sleep(10);
+
+ assertRowsIgnoringOrder(execute("SELECT a, b, c, d FROM mv_test"),
+ row(0, 0, 1, 0),
+ row(0, 1, 1, 0),
+ row(1, 0, 1, 0),
+ row(1, 1, 1, 0)
+ );
+
+ // insert new rows that do not match the filter
+ execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 2, 0, 0, 0);
+ execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 2, 1, 2, 0);
+ assertRowsIgnoringOrder(execute("SELECT a, b, c, d FROM mv_test"),
+ row(0, 0, 1, 0),
+ row(0, 1, 1, 0),
+ row(1, 0, 1, 0),
+ row(1, 1, 1, 0)
+ );
+
+ // insert new row that does match the filter
+ execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 1, 2, 1, 0);
+ assertRowsIgnoringOrder(execute("SELECT a, b, c, d FROM mv_test"),
+ row(0, 0, 1, 0),
+ row(0, 1, 1, 0),
+ row(1, 0, 1, 0),
+ row(1, 1, 1, 0),
+ row(1, 2, 1, 0)
+ );
+
+ // update rows that don't match the filter
+ execute("UPDATE %s SET d = ? WHERE a = ? AND b = ?", 2, 2, 0);
+ execute("UPDATE %s SET d = ? WHERE a = ? AND b = ?", 1, 2, 1);
+ assertRowsIgnoringOrder(execute("SELECT a, b, c, d FROM mv_test"),
+ row(0, 0, 1, 0),
+ row(0, 1, 1, 0),
+ row(1, 0, 1, 0),
+ row(1, 1, 1, 0),
+ row(1, 2, 1, 0)
+ );
+
+ // update a row that does match the filter
+ execute("UPDATE %s SET d = ? WHERE a = ? AND b = ?", 1, 1, 0);
+ assertRowsIgnoringOrder(execute("SELECT a, b, c, d FROM mv_test"),
+ row(0, 0, 1, 0),
+ row(0, 1, 1, 0),
+ row(1, 0, 1, 1),
+ row(1, 1, 1, 0),
+ row(1, 2, 1, 0)
+ );
+
+ // delete rows that don't match the filter
+ execute("DELETE FROM %s WHERE a = ? AND b = ?", 2, 0);
+ assertRowsIgnoringOrder(execute("SELECT a, b, c, d FROM mv_test"),
+ row(0, 0, 1, 0),
+ row(0, 1, 1, 0),
+ row(1, 0, 1, 1),
+ row(1, 1, 1, 0),
+ row(1, 2, 1, 0)
+ );
+
+ // delete a row that does match the filter
+ execute("DELETE FROM %s WHERE a = ? AND b = ?", 1, 2);
+ assertRowsIgnoringOrder(execute("SELECT a, b, c, d FROM mv_test"),
+ row(0, 0, 1, 0),
+ row(0, 1, 1, 0),
+ row(1, 0, 1, 1),
+ row(1, 1, 1, 0)
+ );
+
+ // delete a partition that matches the filter
+ execute("DELETE FROM %s WHERE a = ?", 1);
+ assertRowsIgnoringOrder(execute("SELECT a, b, c, d FROM mv_test"),
+ row(0, 0, 1, 0),
+ row(0, 1, 1, 0)
+ );
+
+ dropView("mv_test");
+ dropTable("DROP TABLE %s");
+ }
+
+ @Test
+ public void complexRestrictedTimestampUpdateTestWithFlush() throws Throwable
+ {
+ complexRestrictedTimestampUpdateTest(true);
+ }
+
+ @Test
+ public void complexRestrictedTimestampUpdateTestWithoutFlush() throws Throwable
+ {
+ complexRestrictedTimestampUpdateTest(false);
+ }
+
+ public void complexRestrictedTimestampUpdateTest(boolean flush) throws Throwable
+ {
+ createTable("CREATE TABLE %s (a int, b int, c int, d int, e int, PRIMARY KEY (a, b))");
+
+ execute("USE " + keyspace());
+ executeNet(protocolVersion, "USE " + keyspace());
+ Keyspace ks = Keyspace.open(keyspace());
+
+ createView("mv", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE a IS NOT NULL AND b IS NOT NULL AND c IS NOT NULL AND c = 1 PRIMARY KEY (c, a, b)");
+ ks.getColumnFamilyStore("mv").disableAutoCompaction();
+
+ //Set initial values TS=0, matching the restriction and verify view
+ executeNet(protocolVersion, "INSERT INTO %s (a, b, c, d) VALUES (0, 0, 1, 0) USING TIMESTAMP 0");
+ assertRows(execute("SELECT d from mv WHERE c = ? and a = ? and b = ?", 1, 0, 0), row(0));
+
+ if (flush)
+ FBUtilities.waitOnFutures(ks.flush());
+
+ //update c's timestamp TS=2
+ executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 2 SET c = ? WHERE a = ? and b = ? ", 1, 0, 0);
+ assertRows(execute("SELECT d from mv WHERE c = ? and a = ? and b = ?", 1, 0, 0), row(0));
+
+ if (flush)
+ FBUtilities.waitOnFutures(ks.flush());
+
+ //change c's value and TS=3, tombstones c=1 and adds c=0 record
+ executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 3 SET c = ? WHERE a = ? and b = ? ", 0, 0, 0);
+ assertRows(execute("SELECT d from mv WHERE c = ? and a = ? and b = ?", 0, 0, 0));
+
+ if(flush)
+ {
+ ks.getColumnFamilyStore("mv").forceMajorCompaction();
+ FBUtilities.waitOnFutures(ks.flush());
+ }
+
+ //change c's value back to 1 with TS=4, check we can see d
+ executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 4 SET c = ? WHERE a = ? and b = ? ", 1, 0, 0);
+ if (flush)
+ {
+ ks.getColumnFamilyStore("mv").forceMajorCompaction();
+ FBUtilities.waitOnFutures(ks.flush());
+ }
+
+ assertRows(execute("SELECT d, e from mv WHERE c = ? and a = ? and b = ?", 1, 0, 0), row(0, null));
+
+
+ //Add e value @ TS=1
+ executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 1 SET e = ? WHERE a = ? and b = ? ", 1, 0, 0);
+ assertRows(execute("SELECT d, e from mv WHERE c = ? and a = ? and b = ?", 1, 0, 0), row(0, 1));
+
+ if (flush)
+ FBUtilities.waitOnFutures(ks.flush());
+
+
+ //Change d value @ TS=2
+ executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 2 SET d = ? WHERE a = ? and b = ? ", 2, 0, 0);
+ assertRows(execute("SELECT d from mv WHERE c = ? and a = ? and b = ?", 1, 0, 0), row(2));
+
+ if (flush)
+ FBUtilities.waitOnFutures(ks.flush());
+
+
+ //Change d value @ TS=3
+ executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 3 SET d = ? WHERE a = ? and b = ? ", 1, 0, 0);
+ assertRows(execute("SELECT d from mv WHERE c = ? and a = ? and b = ?", 1, 0, 0), row(1));
+
+
+ //Tombstone c
+ executeNet(protocolVersion, "DELETE FROM %s WHERE a = ? and b = ?", 0, 0);
+ assertRows(execute("SELECT d from mv"));
+
+ //Add back without D
+ executeNet(protocolVersion, "INSERT INTO %s (a, b, c) VALUES (0, 0, 1)");
+
+ //Make sure D doesn't pop back in.
+ assertRows(execute("SELECT d from mv WHERE c = ? and a = ? and b = ?", 1, 0, 0), row((Object) null));
+
+
+ //New partition
+ // insert a row with timestamp 0
+ executeNet(protocolVersion, "INSERT INTO %s (a, b, c, d, e) VALUES (?, ?, ?, ?, ?) USING TIMESTAMP 0", 1, 0, 1, 0, 0);
+
+ // overwrite pk and e with timestamp 1, but don't overwrite d
+ executeNet(protocolVersion, "INSERT INTO %s (a, b, c, e) VALUES (?, ?, ?, ?) USING TIMESTAMP 1", 1, 0, 1, 0);
+
+ // delete with timestamp 0 (which should only delete d)
+ executeNet(protocolVersion, "DELETE FROM %s USING TIMESTAMP 0 WHERE a = ? AND b = ?", 1, 0);
+ assertRows(execute("SELECT a, b, c, d, e from mv WHERE c = ? and a = ? and b = ?", 1, 1, 0),
+ row(1, 0, 1, null, 0)
+ );
+
+ executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 2 SET c = ? WHERE a = ? AND b = ?", 1, 1, 1);
+ executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 3 SET c = ? WHERE a = ? AND b = ?", 1, 1, 0);
+ assertRows(execute("SELECT a, b, c, d, e from mv WHERE c = ? and a = ? and b = ?", 1, 1, 0),
+ row(1, 0, 1, null, 0)
+ );
+
+ executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 3 SET d = ? WHERE a = ? AND b = ?", 0, 1, 0);
+ assertRows(execute("SELECT a, b, c, d, e from mv WHERE c = ? and a = ? and b = ?", 1, 1, 0),
+ row(1, 0, 1, 0, 0)
+ );
+ }
+
+ @Test
+ public void testRestrictedRegularColumnTimestampUpdates() throws Throwable
+ {
+ // Regression test for CASSANDRA-10910
+
+ createTable("CREATE TABLE %s (" +
+ "k int PRIMARY KEY, " +
+ "c int, " +
+ "val int)");
+
+ execute("USE " + keyspace());
+ executeNet(protocolVersion, "USE " + keyspace());
+
+ createView("mv_rctstest", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE k IS NOT NULL AND c IS NOT NULL AND c = 1 PRIMARY KEY (k,c)");
+
+ updateView("UPDATE %s SET c = ?, val = ? WHERE k = ?", 0, 0, 0);
+ updateView("UPDATE %s SET val = ? WHERE k = ?", 1, 0);
+ updateView("UPDATE %s SET c = ? WHERE k = ?", 1, 0);
+ assertRows(execute("SELECT c, k, val FROM mv_rctstest"), row(1, 0, 1));
+
+ updateView("TRUNCATE %s");
+
+ updateView("UPDATE %s USING TIMESTAMP 1 SET c = ?, val = ? WHERE k = ?", 0, 0, 0);
+ updateView("UPDATE %s USING TIMESTAMP 3 SET c = ? WHERE k = ?", 1, 0);
+ updateView("UPDATE %s USING TIMESTAMP 2 SET val = ? WHERE k = ?", 1, 0);
+ updateView("UPDATE %s USING TIMESTAMP 4 SET c = ? WHERE k = ?", 1, 0);
+ updateView("UPDATE %s USING TIMESTAMP 3 SET val = ? WHERE k = ?", 2, 0);
+ assertRows(execute("SELECT c, k, val FROM mv_rctstest"), row(1, 0, 2));
+ }
+
+ @Test
+ public void testOldTimestampsWithRestrictions() throws Throwable
+ {
+ createTable("CREATE TABLE %s (" +
+ "k int, " +
+ "c int, " +
+ "val text, " + "" +
+ "PRIMARY KEY(k, c))");
+
+ execute("USE " + keyspace());
+ executeNet(protocolVersion, "USE " + keyspace());
+
+ createView("mv_tstest", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL AND val = 'baz' PRIMARY KEY (val,k,c)");
+
+ for (int i = 0; i < 100; i++)
+ updateView("INSERT into %s (k,c,val)VALUES(?,?,?)", 0, i % 2, "baz");
+
+ Keyspace.open(keyspace()).getColumnFamilyStore(currentTable()).forceBlockingFlush();
+
+ Assert.assertEquals(2, execute("select * from %s").size());
+ Assert.assertEquals(2, execute("select * from mv_tstest").size());
+
+ assertRows(execute("SELECT val from %s where k = 0 and c = 0"), row("baz"));
+ assertRows(execute("SELECT c from mv_tstest where k = 0 and val = ?", "baz"), row(0), row(1));
+
+ //Make sure an old TS does nothing
+ updateView("UPDATE %s USING TIMESTAMP 100 SET val = ? where k = ? AND c = ?", "bar", 0, 1);
+ assertRows(execute("SELECT val from %s where k = 0 and c = 1"), row("baz"));
+ assertRows(execute("SELECT c from mv_tstest where k = 0 and val = ?", "baz"), row(0), row(1));
+ assertRows(execute("SELECT c from mv_tstest where k = 0 and val = ?", "bar"));
+
+ //Latest TS
+ updateView("UPDATE %s SET val = ? where k = ? AND c = ?", "bar", 0, 1);
+ assertRows(execute("SELECT val from %s where k = 0 and c = 1"), row("bar"));
+ assertRows(execute("SELECT c from mv_tstest where k = 0 and val = ?", "bar"));
+ assertRows(execute("SELECT c from mv_tstest where k = 0 and val = ?", "baz"), row(0));
+ }
}