You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2014/09/17 00:47:51 UTC
git commit: Invalidate prepared stmts when ks or table is dropped
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 109313c3e -> 90a211455
Invalidate prepared stmts when ks or table is dropped
Patch by Viju Kothuvatiparambil; review by Tyler Hobbs for
CASSANDRA-7566
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/90a21145
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/90a21145
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/90a21145
Branch: refs/heads/cassandra-2.1
Commit: 90a2114551bf2052e2abccbc04c654ceec74c2d3
Parents: 109313c
Author: Viju Kothuvatiparambil <vi...@mindmax.us>
Authored: Tue Sep 16 17:46:53 2014 -0500
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Tue Sep 16 17:46:53 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/cassandra/cql3/QueryProcessor.java | 67 +++++++++++++++
.../cql3/PreparedStatementCleanupTest.java | 86 ++++++++++++++++++++
3 files changed, 155 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/90a21145/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1764a20..ca578f3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
2.1.1
+ * Invalidate prepared statements when their keyspace or table is
+ dropped (CASSANDRA-7566)
* cassandra-stress: fix support for NetworkTopologyStrategy (CASSANDRA-7945)
* Fix saving caches when a table is dropped (CASSANDRA-7784)
* Add better error checking of new stress profile (CASSANDRA-7716)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/90a21145/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index a5be108..efd1ebb 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -29,6 +29,8 @@ import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import com.googlecode.concurrentlinkedhashmap.EntryWeigher;
import com.googlecode.concurrentlinkedhashmap.EvictionListener;
import org.antlr.runtime.*;
+import org.apache.cassandra.service.IMigrationListener;
+import org.apache.cassandra.service.MigrationManager;
import org.github.jamm.MemoryMeter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -146,6 +148,7 @@ public class QueryProcessor implements QueryHandler
private QueryProcessor()
{
+ MigrationManager.instance.register(new MigrationSubscriber());
}
public ParsedStatement.Prepared getPrepared(MD5Digest id)
@@ -508,4 +511,68 @@ public class QueryProcessor implements QueryHandler
? ((MeasurableForPreparedCache)key).measureForPreparedCache(meter)
: meter.measureDeep(key);
}
+
+ private static class MigrationSubscriber implements IMigrationListener
+ {
+ private void removeInvalidPreparedStatements(String ksName, String cfName)
+ {
+ Iterator<ParsedStatement.Prepared> iterator = preparedStatements.values().iterator();
+ while (iterator.hasNext())
+ {
+ if (shouldInvalidate(ksName, cfName, iterator.next().statement))
+ iterator.remove();
+ }
+
+ Iterator<CQLStatement> thriftIterator = thriftPreparedStatements.values().iterator();
+ while (thriftIterator.hasNext())
+ {
+ if (shouldInvalidate(ksName, cfName, thriftIterator.next()))
+ thriftIterator.remove();
+ }
+ }
+
+ private boolean shouldInvalidate(String ksName, String cfName, CQLStatement statement)
+ {
+ String statementKsName;
+ String statementCfName;
+
+ if (statement instanceof ModificationStatement)
+ {
+ ModificationStatement modificationStatement = ((ModificationStatement) statement);
+ statementKsName = modificationStatement.keyspace();
+ statementCfName = modificationStatement.columnFamily();
+ }
+ else if (statement instanceof SelectStatement)
+ {
+ SelectStatement selectStatement = ((SelectStatement) statement);
+ statementKsName = selectStatement.keyspace();
+ statementCfName = selectStatement.columnFamily();
+ }
+ else
+ {
+ return false;
+ }
+
+ return ksName.equals(statementKsName) && (cfName == null || cfName.equals(statementCfName));
+ }
+
+ public void onCreateKeyspace(String ksName) { }
+ public void onCreateColumnFamily(String ksName, String cfName) { }
+ public void onCreateUserType(String ksName, String typeName) { }
+ public void onUpdateKeyspace(String ksName) { }
+ public void onUpdateColumnFamily(String ksName, String cfName) { }
+ public void onUpdateUserType(String ksName, String typeName) { }
+
+ public void onDropKeyspace(String ksName)
+ {
+ removeInvalidPreparedStatements(ksName, null);
+ }
+
+ public void onDropColumnFamily(String ksName, String cfName)
+ {
+ removeInvalidPreparedStatements(ksName, cfName);
+ }
+
+ public void onDropUserType(String ksName, String typeName) { }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/90a21145/test/unit/org/apache/cassandra/cql3/PreparedStatementCleanupTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/PreparedStatementCleanupTest.java b/test/unit/org/apache/cassandra/cql3/PreparedStatementCleanupTest.java
new file mode 100644
index 0000000..3e725e9
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/PreparedStatementCleanupTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.service.EmbeddedCassandraService;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class PreparedStatementCleanupTest extends SchemaLoader
+{
+ private static Cluster cluster;
+ private static Session session;
+
+ private static final String KEYSPACE = "prepared_stmt_cleanup";
+ private static final String createKsStatement = "CREATE KEYSPACE " + KEYSPACE +
+ " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };";
+ private static final String dropKsStatement = "DROP KEYSPACE IF EXISTS " + KEYSPACE;
+
+ @BeforeClass
+ public static void setup() throws Exception
+ {
+ Schema.instance.clear();
+
+ EmbeddedCassandraService cassandra = new EmbeddedCassandraService();
+ cassandra.start();
+
+ // Currently the native server start method return before the server is fully binded to the socket, so we need
+ // to wait slightly before trying to connect to it. We should fix this but in the meantime using a sleep.
+ Thread.sleep(500);
+
+ cluster = Cluster.builder().addContactPoint("127.0.0.1")
+ .withPort(DatabaseDescriptor.getNativeTransportPort())
+ .build();
+ session = cluster.connect();
+
+ session.execute(dropKsStatement);
+ session.execute(createKsStatement);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception
+ {
+ cluster.close();
+ }
+
+ @Test
+ public void testInvalidatePreparedStatementsOnDrop()
+ {
+ String createTableStatement = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".qp_cleanup (id int PRIMARY KEY, cid int, val text);";
+ String dropTableStatement = "DROP TABLE IF EXISTS " + KEYSPACE + ".qp_cleanup;";
+
+ session.execute(createTableStatement);
+ PreparedStatement prepared = session.prepare("INSERT INTO " + KEYSPACE + ".qp_cleanup (id, cid, val) VALUES (?, ?, ?)");
+ session.execute(dropTableStatement);
+ session.execute(createTableStatement);
+ session.execute(prepared.bind(1, 1, "value"));
+
+ session.execute(dropKsStatement);
+ session.execute(createKsStatement);
+ session.execute(createTableStatement);
+ session.execute(prepared.bind(1, 1, "value"));
+ session.execute(dropKsStatement);
+ }
+}