You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2014/09/17 00:47:51 UTC

git commit: Invalidate prepared stmts when ks or table is dropped

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 109313c3e -> 90a211455


Invalidate prepared stmts when ks or table is dropped

Patch by Viju Kothuvatiparambil; review by Tyler Hobbs for
CASSANDRA-7566


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/90a21145
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/90a21145
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/90a21145

Branch: refs/heads/cassandra-2.1
Commit: 90a2114551bf2052e2abccbc04c654ceec74c2d3
Parents: 109313c
Author: Viju Kothuvatiparambil <vi...@mindmax.us>
Authored: Tue Sep 16 17:46:53 2014 -0500
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Tue Sep 16 17:46:53 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/cassandra/cql3/QueryProcessor.java   | 67 +++++++++++++++
 .../cql3/PreparedStatementCleanupTest.java      | 86 ++++++++++++++++++++
 3 files changed, 155 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/90a21145/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1764a20..ca578f3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 2.1.1
+ * Invalidate prepared statements when their keyspace or table is
+   dropped (CASSANDRA-7566)
  * cassandra-stress: fix support for NetworkTopologyStrategy (CASSANDRA-7945)
  * Fix saving caches when a table is dropped (CASSANDRA-7784)
  * Add better error checking of new stress profile (CASSANDRA-7716)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90a21145/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index a5be108..efd1ebb 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -29,6 +29,8 @@ import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
 import com.googlecode.concurrentlinkedhashmap.EntryWeigher;
 import com.googlecode.concurrentlinkedhashmap.EvictionListener;
 import org.antlr.runtime.*;
+import org.apache.cassandra.service.IMigrationListener;
+import org.apache.cassandra.service.MigrationManager;
 import org.github.jamm.MemoryMeter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -146,6 +148,7 @@ public class QueryProcessor implements QueryHandler
 
     private QueryProcessor()
     {
+        MigrationManager.instance.register(new MigrationSubscriber());
     }
 
     public ParsedStatement.Prepared getPrepared(MD5Digest id)
@@ -508,4 +511,68 @@ public class QueryProcessor implements QueryHandler
              ? ((MeasurableForPreparedCache)key).measureForPreparedCache(meter)
              : meter.measureDeep(key);
     }
+
+    private static class MigrationSubscriber implements IMigrationListener
+    {
+        private void removeInvalidPreparedStatements(String ksName, String cfName)
+        {
+            Iterator<ParsedStatement.Prepared> iterator = preparedStatements.values().iterator();
+            while (iterator.hasNext())
+            {
+                if (shouldInvalidate(ksName, cfName, iterator.next().statement))
+                    iterator.remove();
+            }
+
+            Iterator<CQLStatement> thriftIterator = thriftPreparedStatements.values().iterator();
+            while (thriftIterator.hasNext())
+            {
+                if (shouldInvalidate(ksName, cfName, thriftIterator.next()))
+                    thriftIterator.remove();
+            }
+        }
+
+        private boolean shouldInvalidate(String ksName, String cfName, CQLStatement statement)
+        {
+            String statementKsName;
+            String statementCfName;
+
+            if (statement instanceof ModificationStatement)
+            {
+                ModificationStatement modificationStatement = ((ModificationStatement) statement);
+                statementKsName = modificationStatement.keyspace();
+                statementCfName = modificationStatement.columnFamily();
+            }
+            else if (statement instanceof SelectStatement)
+            {
+                SelectStatement selectStatement = ((SelectStatement) statement);
+                statementKsName = selectStatement.keyspace();
+                statementCfName = selectStatement.columnFamily();
+            }
+            else
+            {
+                return false;
+            }
+
+            return ksName.equals(statementKsName) && (cfName == null || cfName.equals(statementCfName));
+        }
+
+        public void onCreateKeyspace(String ksName) { }
+        public void onCreateColumnFamily(String ksName, String cfName) { }
+        public void onCreateUserType(String ksName, String typeName) { }
+        public void onUpdateKeyspace(String ksName) { }
+        public void onUpdateColumnFamily(String ksName, String cfName) { }
+        public void onUpdateUserType(String ksName, String typeName) { }
+
+        public void onDropKeyspace(String ksName)
+        {
+            removeInvalidPreparedStatements(ksName, null);
+        }
+
+        public void onDropColumnFamily(String ksName, String cfName)
+        {
+            removeInvalidPreparedStatements(ksName, cfName);
+        }
+
+        public void onDropUserType(String ksName, String typeName) { }
+	}
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90a21145/test/unit/org/apache/cassandra/cql3/PreparedStatementCleanupTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/PreparedStatementCleanupTest.java b/test/unit/org/apache/cassandra/cql3/PreparedStatementCleanupTest.java
new file mode 100644
index 0000000..3e725e9
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/PreparedStatementCleanupTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.service.EmbeddedCassandraService;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class PreparedStatementCleanupTest extends SchemaLoader
+{
+    private static Cluster cluster;
+    private static Session session;
+
+    private static final String KEYSPACE = "prepared_stmt_cleanup";
+    private static final String createKsStatement = "CREATE KEYSPACE " + KEYSPACE +
+                                                    " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };";
+    private static final String dropKsStatement = "DROP KEYSPACE IF EXISTS " + KEYSPACE;
+
+    @BeforeClass
+    public static void setup() throws Exception
+    {
+        Schema.instance.clear();
+
+        EmbeddedCassandraService cassandra = new EmbeddedCassandraService();
+        cassandra.start();
+
+        // Currently the native server start method return before the server is fully binded to the socket, so we need
+        // to wait slightly before trying to connect to it. We should fix this but in the meantime using a sleep.
+        Thread.sleep(500);
+
+		cluster = Cluster.builder().addContactPoint("127.0.0.1")
+                                   .withPort(DatabaseDescriptor.getNativeTransportPort())
+                                   .build();
+        session = cluster.connect();
+
+        session.execute(dropKsStatement);
+        session.execute(createKsStatement);
+	}
+
+    @AfterClass
+    public static void tearDown() throws Exception
+    {
+        cluster.close();
+    }
+
+    @Test
+    public void testInvalidatePreparedStatementsOnDrop()
+    {
+        String createTableStatement = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".qp_cleanup (id int PRIMARY KEY, cid int, val text);";
+        String dropTableStatement = "DROP TABLE IF EXISTS " + KEYSPACE + ".qp_cleanup;";
+
+        session.execute(createTableStatement);
+        PreparedStatement prepared = session.prepare("INSERT INTO " + KEYSPACE + ".qp_cleanup (id, cid, val) VALUES (?, ?, ?)");
+        session.execute(dropTableStatement);
+        session.execute(createTableStatement);
+        session.execute(prepared.bind(1, 1, "value"));
+
+        session.execute(dropKsStatement);
+        session.execute(createKsStatement);
+        session.execute(createTableStatement);
+        session.execute(prepared.bind(1, 1, "value"));
+        session.execute(dropKsStatement);
+	}
+}