You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2017/06/28 19:19:24 UTC
[2/3] cassandra git commit: Properly evict pstmts from prepared
statements cache
Properly evict pstmts from prepared statements cache
patch by Robert Stupp; reviewed by Benjamin Lerer for CASSANDRA-13641
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9562b9b6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9562b9b6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9562b9b6
Branch: refs/heads/trunk
Commit: 9562b9b69e08b84ec1e8e431a846548fa8a83b44
Parents: bb7e522
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed Jun 28 21:15:03 2017 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Jun 28 21:15:03 2017 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/cql3/QueryProcessor.java | 9 +-
.../org/apache/cassandra/db/SystemKeyspace.java | 6 ++
test/conf/cassandra.yaml | 1 +
.../cassandra/cql3/PstmtPersistenceTest.java | 108 ++++++++++++++-----
5 files changed, 100 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9562b9b6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4297a15..88aa1ef 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.11.1
+ * Properly evict pstmts from prepared statements cache (CASSANDRA-13641)
Merged from 3.0:
* Fix secondary index queries on COMPACT tables (CASSANDRA-13627)
* Nodetool listsnapshots output is missing a newline, if there are no snapshots (CASSANDRA-13568)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9562b9b6/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index f5ce7e4..0e0ba3c 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -88,6 +88,7 @@ public class QueryProcessor implements QueryHandler
.listener((md5Digest, prepared) -> {
metrics.preparedStatementsEvicted.inc();
lastMinuteEvictionsCount.incrementAndGet();
+ SystemKeyspace.removePreparedStatement(md5Digest);
}).build();
thriftPreparedStatements = new ConcurrentLinkedHashMap.Builder<Integer, ParsedStatement.Prepared>()
@@ -162,11 +163,17 @@ public class QueryProcessor implements QueryHandler
logger.info("Preloaded {} prepared statements", count);
}
+ /**
+ * Clears the prepared statement cache.
+ * @param memoryOnly {@code true} if only the in memory caches must be cleared, {@code false} otherwise.
+ */
@VisibleForTesting
- public static void clearPrepraredStatements()
+ public static void clearPreparedStatements(boolean memoryOnly)
{
preparedStatements.clear();
thriftPreparedStatements.clear();
+ if (!memoryOnly)
+ SystemKeyspace.resetPreparedStatements();
}
private static QueryState internalQueryState()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9562b9b6/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 82c9752..6c45329 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -1488,6 +1488,12 @@ public final class SystemKeyspace
key.byteBuffer());
}
+ public static void resetPreparedStatements()
+ {
+ ColumnFamilyStore availableRanges = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(PREPARED_STATEMENTS);
+ availableRanges.truncateBlocking();
+ }
+
public static List<Pair<String, String>> loadPreparedStatements()
{
String query = String.format("SELECT logged_keyspace, query_string FROM %s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, PREPARED_STATEMENTS);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9562b9b6/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index cf02634..96ca9a0 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -44,3 +44,4 @@ row_cache_class_name: org.apache.cassandra.cache.OHCProvider
row_cache_size_in_mb: 16
enable_user_defined_functions: true
enable_scripted_user_defined_functions: true
+prepared_statements_cache_size_mb: 1
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9562b9b6/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
index 380dbda..e7adc8e 100644
--- a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.junit.Before;
import org.junit.Test;
import junit.framework.Assert;
@@ -36,16 +37,23 @@ import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.MD5Digest;
+import static org.junit.Assert.*;
+
public class PstmtPersistenceTest extends CQLTester
{
+ @Before
+ public void setUp()
+ {
+ QueryProcessor.clearPreparedStatements(false);
+ }
+
@Test
public void testCachedPreparedStatements() throws Throwable
{
// need this for pstmt execution/validation tests
requireNetwork();
- int rows = QueryProcessor.executeOnceInternal("SELECT * FROM " + SchemaConstants.SYSTEM_KEYSPACE_NAME + '.' + SystemKeyspace.PREPARED_STATEMENTS).size();
- Assert.assertEquals(0, rows);
+ assertEquals(0, numberOfStatementsOnDisk());
execute("CREATE KEYSPACE IF NOT EXISTS foo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}");
execute("CREATE TABLE foo.bar (key text PRIMARY KEY, val int)");
@@ -56,30 +64,27 @@ public class PstmtPersistenceTest extends CQLTester
List<MD5Digest> stmtIds = new ArrayList<>();
// #0
- stmtIds.add(QueryProcessor.prepare("SELECT * FROM " + SchemaConstants.SCHEMA_KEYSPACE_NAME + '.' + SchemaKeyspace.TABLES + " WHERE keyspace_name = ?", clientState, false).statementId);
+ stmtIds.add(prepareStatement("SELECT * FROM %s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.TABLES, clientState));
// #1
- stmtIds.add(QueryProcessor.prepare("SELECT * FROM " + KEYSPACE + '.' + currentTable() + " WHERE pk = ?", clientState, false).statementId);
+ stmtIds.add(prepareStatement("SELECT * FROM %s WHERE pk = ?", clientState));
// #2
- stmtIds.add(QueryProcessor.prepare("SELECT * FROM foo.bar WHERE key = ?", clientState, false).statementId);
+ stmtIds.add(prepareStatement("SELECT * FROM %s WHERE key = ?", "foo", "bar", clientState));
clientState.setKeyspace("foo");
// #3
- stmtIds.add(QueryProcessor.prepare("SELECT * FROM " + KEYSPACE + '.' + currentTable() + " WHERE pk = ?", clientState, false).statementId);
+ stmtIds.add(prepareStatement("SELECT * FROM %s WHERE pk = ?", clientState));
// #4
- stmtIds.add(QueryProcessor.prepare("SELECT * FROM foo.bar WHERE key = ?", clientState, false).statementId);
+ stmtIds.add(prepareStatement("SELECT * FROM %S WHERE key = ?", "foo", "bar", clientState));
- Assert.assertEquals(5, stmtIds.size());
- Assert.assertEquals(5, QueryProcessor.preparedStatementsCount());
-
- String queryAll = "SELECT * FROM " + SchemaConstants.SYSTEM_KEYSPACE_NAME + '.' + SystemKeyspace.PREPARED_STATEMENTS;
+ assertEquals(5, stmtIds.size());
+ assertEquals(5, QueryProcessor.preparedStatementsCount());
- rows = QueryProcessor.executeOnceInternal(queryAll).size();
- Assert.assertEquals(5, rows);
+ Assert.assertEquals(5, numberOfStatementsOnDisk());
QueryHandler handler = ClientState.getCQLQueryHandler();
validatePstmts(stmtIds, handler);
// clear prepared statements cache
- QueryProcessor.clearPrepraredStatements();
+ QueryProcessor.clearPreparedStatements(true);
Assert.assertEquals(0, QueryProcessor.preparedStatementsCount());
for (MD5Digest stmtId : stmtIds)
Assert.assertNull(handler.getPrepared(stmtId));
@@ -88,7 +93,9 @@ public class PstmtPersistenceTest extends CQLTester
QueryProcessor.preloadPreparedStatement();
validatePstmts(stmtIds, handler);
+
// validate that the prepared statements are in the system table
+ String queryAll = "SELECT * FROM " + SchemaConstants.SYSTEM_KEYSPACE_NAME + '.' + SystemKeyspace.PREPARED_STATEMENTS;
for (UntypedResultSet.Row row : QueryProcessor.executeOnceInternal(queryAll))
{
MD5Digest digest = MD5Digest.wrap(ByteBufferUtil.getArray(row.getBytes("prepared_id")));
@@ -97,22 +104,19 @@ public class PstmtPersistenceTest extends CQLTester
}
// add anther prepared statement and sync it to table
- QueryProcessor.prepare("SELECT * FROM bar WHERE key = ?", clientState, false);
- Assert.assertEquals(6, QueryProcessor.preparedStatementsCount());
- rows = QueryProcessor.executeOnceInternal(queryAll).size();
- Assert.assertEquals(6, rows);
+ prepareStatement("SELECT * FROM %s WHERE key = ?", "foo", "bar", clientState);
+ assertEquals(6, numberOfStatementsInMemory());
+ assertEquals(6, numberOfStatementsOnDisk());
// drop a keyspace (prepared statements are removed - syncPreparedStatements() remove should the rows, too)
execute("DROP KEYSPACE foo");
- Assert.assertEquals(3, QueryProcessor.preparedStatementsCount());
- rows = QueryProcessor.executeOnceInternal(queryAll).size();
- Assert.assertEquals(3, rows);
-
+ assertEquals(3, numberOfStatementsInMemory());
+ assertEquals(3, numberOfStatementsOnDisk());
}
private void validatePstmts(List<MD5Digest> stmtIds, QueryHandler handler)
{
- Assert.assertEquals(5, QueryProcessor.preparedStatementsCount());
+ assertEquals(5, QueryProcessor.preparedStatementsCount());
QueryOptions optionsStr = QueryOptions.forInternalCalls(Collections.singletonList(UTF8Type.instance.fromString("foobar")));
QueryOptions optionsInt = QueryOptions.forInternalCalls(Collections.singletonList(Int32Type.instance.decompose(42)));
validatePstmt(handler, stmtIds.get(0), optionsStr);
@@ -125,7 +129,63 @@ public class PstmtPersistenceTest extends CQLTester
private static void validatePstmt(QueryHandler handler, MD5Digest stmtId, QueryOptions options)
{
ParsedStatement.Prepared prepared = handler.getPrepared(stmtId);
- Assert.assertNotNull(prepared);
+ assertNotNull(prepared);
handler.processPrepared(prepared.statement, QueryState.forInternalCalls(), options, Collections.emptyMap(), System.nanoTime());
}
+
+ @Test
+ public void testPstmtInvalidation() throws Throwable
+ {
+ ClientState clientState = ClientState.forInternalCalls();
+
+ createTable("CREATE TABLE %s (key int primary key, val int)");
+
+ for (int cnt = 1; cnt < 10000; cnt++)
+ {
+ prepareStatement("INSERT INTO %s (key, val) VALUES (?, ?) USING TIMESTAMP " + cnt, clientState);
+
+ if (numberOfEvictedStatements() > 0)
+ {
+ assertEquals("Number of statements in table and in cache don't match", numberOfStatementsInMemory(), numberOfStatementsOnDisk());
+
+ // prepare a more statements to trigger more evictions
+ for (int cnt2 = 1; cnt2 < 10; cnt2++)
+ prepareStatement("INSERT INTO %s (key, val) VALUES (?, ?) USING TIMESTAMP " + cnt2, clientState);
+
+ // each new prepared statement should have caused an eviction
+ assertEquals("eviction count didn't increase by the expected number", numberOfEvictedStatements(), 10);
+ assertEquals("Number of statements in table and in cache don't match", numberOfStatementsInMemory(), numberOfStatementsOnDisk());
+
+ return;
+ }
+ }
+
+ fail("Prepared statement eviction does not work");
+ }
+
+ private long numberOfStatementsOnDisk() throws Throwable
+ {
+ UntypedResultSet.Row row = execute("SELECT COUNT(*) FROM " + SchemaConstants.SYSTEM_KEYSPACE_NAME + '.' + SystemKeyspace.PREPARED_STATEMENTS).one();
+ return row.getLong("count");
+ }
+
+ private long numberOfStatementsInMemory()
+ {
+ return QueryProcessor.preparedStatementsCount();
+ }
+
+ private long numberOfEvictedStatements()
+ {
+ return QueryProcessor.metrics.preparedStatementsEvicted.getCount();
+ }
+
+ private MD5Digest prepareStatement(String stmt, ClientState clientState)
+ {
+ return prepareStatement(stmt, keyspace(), currentTable(), clientState);
+ }
+
+ private MD5Digest prepareStatement(String stmt, String keyspace, String table, ClientState clientState)
+ {
+ return QueryProcessor.prepare(String.format(stmt, keyspace + "." + table), clientState, false).statementId;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org