You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2017/06/28 19:19:23 UTC
[1/3] cassandra git commit: Properly evict pstmts from prepared
statements cache
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.11 bb7e522b4 -> 9562b9b69
refs/heads/trunk 26e025804 -> 9c6f87c35
Properly evict pstmts from prepared statements cache
patch by Robert Stupp; reviewed by Benjamin Lerer for CASSANDRA-13641
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9562b9b6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9562b9b6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9562b9b6
Branch: refs/heads/cassandra-3.11
Commit: 9562b9b69e08b84ec1e8e431a846548fa8a83b44
Parents: bb7e522
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed Jun 28 21:15:03 2017 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Jun 28 21:15:03 2017 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/cql3/QueryProcessor.java | 9 +-
.../org/apache/cassandra/db/SystemKeyspace.java | 6 ++
test/conf/cassandra.yaml | 1 +
.../cassandra/cql3/PstmtPersistenceTest.java | 108 ++++++++++++++-----
5 files changed, 100 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9562b9b6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4297a15..88aa1ef 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.11.1
+ * Properly evict pstmts from prepared statements cache (CASSANDRA-13641)
Merged from 3.0:
* Fix secondary index queries on COMPACT tables (CASSANDRA-13627)
* Nodetool listsnapshots output is missing a newline, if there are no snapshots (CASSANDRA-13568)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9562b9b6/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index f5ce7e4..0e0ba3c 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -88,6 +88,7 @@ public class QueryProcessor implements QueryHandler
.listener((md5Digest, prepared) -> {
metrics.preparedStatementsEvicted.inc();
lastMinuteEvictionsCount.incrementAndGet();
+ SystemKeyspace.removePreparedStatement(md5Digest);
}).build();
thriftPreparedStatements = new ConcurrentLinkedHashMap.Builder<Integer, ParsedStatement.Prepared>()
@@ -162,11 +163,17 @@ public class QueryProcessor implements QueryHandler
logger.info("Preloaded {} prepared statements", count);
}
+ /**
+ * Clears the prepared statement cache.
+ * @param memoryOnly {@code true} if only the in memory caches must be cleared, {@code false} otherwise.
+ */
@VisibleForTesting
- public static void clearPrepraredStatements()
+ public static void clearPreparedStatements(boolean memoryOnly)
{
preparedStatements.clear();
thriftPreparedStatements.clear();
+ if (!memoryOnly)
+ SystemKeyspace.resetPreparedStatements();
}
private static QueryState internalQueryState()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9562b9b6/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 82c9752..6c45329 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -1488,6 +1488,12 @@ public final class SystemKeyspace
key.byteBuffer());
}
+ public static void resetPreparedStatements()
+ {
+ ColumnFamilyStore availableRanges = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(PREPARED_STATEMENTS);
+ availableRanges.truncateBlocking();
+ }
+
public static List<Pair<String, String>> loadPreparedStatements()
{
String query = String.format("SELECT logged_keyspace, query_string FROM %s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, PREPARED_STATEMENTS);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9562b9b6/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index cf02634..96ca9a0 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -44,3 +44,4 @@ row_cache_class_name: org.apache.cassandra.cache.OHCProvider
row_cache_size_in_mb: 16
enable_user_defined_functions: true
enable_scripted_user_defined_functions: true
+prepared_statements_cache_size_mb: 1
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9562b9b6/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
index 380dbda..e7adc8e 100644
--- a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.junit.Before;
import org.junit.Test;
import junit.framework.Assert;
@@ -36,16 +37,23 @@ import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.MD5Digest;
+import static org.junit.Assert.*;
+
public class PstmtPersistenceTest extends CQLTester
{
+ @Before
+ public void setUp()
+ {
+ QueryProcessor.clearPreparedStatements(false);
+ }
+
@Test
public void testCachedPreparedStatements() throws Throwable
{
// need this for pstmt execution/validation tests
requireNetwork();
- int rows = QueryProcessor.executeOnceInternal("SELECT * FROM " + SchemaConstants.SYSTEM_KEYSPACE_NAME + '.' + SystemKeyspace.PREPARED_STATEMENTS).size();
- Assert.assertEquals(0, rows);
+ assertEquals(0, numberOfStatementsOnDisk());
execute("CREATE KEYSPACE IF NOT EXISTS foo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}");
execute("CREATE TABLE foo.bar (key text PRIMARY KEY, val int)");
@@ -56,30 +64,27 @@ public class PstmtPersistenceTest extends CQLTester
List<MD5Digest> stmtIds = new ArrayList<>();
// #0
- stmtIds.add(QueryProcessor.prepare("SELECT * FROM " + SchemaConstants.SCHEMA_KEYSPACE_NAME + '.' + SchemaKeyspace.TABLES + " WHERE keyspace_name = ?", clientState, false).statementId);
+ stmtIds.add(prepareStatement("SELECT * FROM %s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.TABLES, clientState));
// #1
- stmtIds.add(QueryProcessor.prepare("SELECT * FROM " + KEYSPACE + '.' + currentTable() + " WHERE pk = ?", clientState, false).statementId);
+ stmtIds.add(prepareStatement("SELECT * FROM %s WHERE pk = ?", clientState));
// #2
- stmtIds.add(QueryProcessor.prepare("SELECT * FROM foo.bar WHERE key = ?", clientState, false).statementId);
+ stmtIds.add(prepareStatement("SELECT * FROM %s WHERE key = ?", "foo", "bar", clientState));
clientState.setKeyspace("foo");
// #3
- stmtIds.add(QueryProcessor.prepare("SELECT * FROM " + KEYSPACE + '.' + currentTable() + " WHERE pk = ?", clientState, false).statementId);
+ stmtIds.add(prepareStatement("SELECT * FROM %s WHERE pk = ?", clientState));
// #4
- stmtIds.add(QueryProcessor.prepare("SELECT * FROM foo.bar WHERE key = ?", clientState, false).statementId);
+ stmtIds.add(prepareStatement("SELECT * FROM %S WHERE key = ?", "foo", "bar", clientState));
- Assert.assertEquals(5, stmtIds.size());
- Assert.assertEquals(5, QueryProcessor.preparedStatementsCount());
-
- String queryAll = "SELECT * FROM " + SchemaConstants.SYSTEM_KEYSPACE_NAME + '.' + SystemKeyspace.PREPARED_STATEMENTS;
+ assertEquals(5, stmtIds.size());
+ assertEquals(5, QueryProcessor.preparedStatementsCount());
- rows = QueryProcessor.executeOnceInternal(queryAll).size();
- Assert.assertEquals(5, rows);
+ Assert.assertEquals(5, numberOfStatementsOnDisk());
QueryHandler handler = ClientState.getCQLQueryHandler();
validatePstmts(stmtIds, handler);
// clear prepared statements cache
- QueryProcessor.clearPrepraredStatements();
+ QueryProcessor.clearPreparedStatements(true);
Assert.assertEquals(0, QueryProcessor.preparedStatementsCount());
for (MD5Digest stmtId : stmtIds)
Assert.assertNull(handler.getPrepared(stmtId));
@@ -88,7 +93,9 @@ public class PstmtPersistenceTest extends CQLTester
QueryProcessor.preloadPreparedStatement();
validatePstmts(stmtIds, handler);
+
// validate that the prepared statements are in the system table
+ String queryAll = "SELECT * FROM " + SchemaConstants.SYSTEM_KEYSPACE_NAME + '.' + SystemKeyspace.PREPARED_STATEMENTS;
for (UntypedResultSet.Row row : QueryProcessor.executeOnceInternal(queryAll))
{
MD5Digest digest = MD5Digest.wrap(ByteBufferUtil.getArray(row.getBytes("prepared_id")));
@@ -97,22 +104,19 @@ public class PstmtPersistenceTest extends CQLTester
}
// add anther prepared statement and sync it to table
- QueryProcessor.prepare("SELECT * FROM bar WHERE key = ?", clientState, false);
- Assert.assertEquals(6, QueryProcessor.preparedStatementsCount());
- rows = QueryProcessor.executeOnceInternal(queryAll).size();
- Assert.assertEquals(6, rows);
+ prepareStatement("SELECT * FROM %s WHERE key = ?", "foo", "bar", clientState);
+ assertEquals(6, numberOfStatementsInMemory());
+ assertEquals(6, numberOfStatementsOnDisk());
// drop a keyspace (prepared statements are removed - syncPreparedStatements() remove should the rows, too)
execute("DROP KEYSPACE foo");
- Assert.assertEquals(3, QueryProcessor.preparedStatementsCount());
- rows = QueryProcessor.executeOnceInternal(queryAll).size();
- Assert.assertEquals(3, rows);
-
+ assertEquals(3, numberOfStatementsInMemory());
+ assertEquals(3, numberOfStatementsOnDisk());
}
private void validatePstmts(List<MD5Digest> stmtIds, QueryHandler handler)
{
- Assert.assertEquals(5, QueryProcessor.preparedStatementsCount());
+ assertEquals(5, QueryProcessor.preparedStatementsCount());
QueryOptions optionsStr = QueryOptions.forInternalCalls(Collections.singletonList(UTF8Type.instance.fromString("foobar")));
QueryOptions optionsInt = QueryOptions.forInternalCalls(Collections.singletonList(Int32Type.instance.decompose(42)));
validatePstmt(handler, stmtIds.get(0), optionsStr);
@@ -125,7 +129,63 @@ public class PstmtPersistenceTest extends CQLTester
private static void validatePstmt(QueryHandler handler, MD5Digest stmtId, QueryOptions options)
{
ParsedStatement.Prepared prepared = handler.getPrepared(stmtId);
- Assert.assertNotNull(prepared);
+ assertNotNull(prepared);
handler.processPrepared(prepared.statement, QueryState.forInternalCalls(), options, Collections.emptyMap(), System.nanoTime());
}
+
+ @Test
+ public void testPstmtInvalidation() throws Throwable
+ {
+ ClientState clientState = ClientState.forInternalCalls();
+
+ createTable("CREATE TABLE %s (key int primary key, val int)");
+
+ for (int cnt = 1; cnt < 10000; cnt++)
+ {
+ prepareStatement("INSERT INTO %s (key, val) VALUES (?, ?) USING TIMESTAMP " + cnt, clientState);
+
+ if (numberOfEvictedStatements() > 0)
+ {
+ assertEquals("Number of statements in table and in cache don't match", numberOfStatementsInMemory(), numberOfStatementsOnDisk());
+
+ // prepare a more statements to trigger more evictions
+ for (int cnt2 = 1; cnt2 < 10; cnt2++)
+ prepareStatement("INSERT INTO %s (key, val) VALUES (?, ?) USING TIMESTAMP " + cnt2, clientState);
+
+ // each new prepared statement should have caused an eviction
+ assertEquals("eviction count didn't increase by the expected number", numberOfEvictedStatements(), 10);
+ assertEquals("Number of statements in table and in cache don't match", numberOfStatementsInMemory(), numberOfStatementsOnDisk());
+
+ return;
+ }
+ }
+
+ fail("Prepared statement eviction does not work");
+ }
+
+ private long numberOfStatementsOnDisk() throws Throwable
+ {
+ UntypedResultSet.Row row = execute("SELECT COUNT(*) FROM " + SchemaConstants.SYSTEM_KEYSPACE_NAME + '.' + SystemKeyspace.PREPARED_STATEMENTS).one();
+ return row.getLong("count");
+ }
+
+ private long numberOfStatementsInMemory()
+ {
+ return QueryProcessor.preparedStatementsCount();
+ }
+
+ private long numberOfEvictedStatements()
+ {
+ return QueryProcessor.metrics.preparedStatementsEvicted.getCount();
+ }
+
+ private MD5Digest prepareStatement(String stmt, ClientState clientState)
+ {
+ return prepareStatement(stmt, keyspace(), currentTable(), clientState);
+ }
+
+ private MD5Digest prepareStatement(String stmt, String keyspace, String table, ClientState clientState)
+ {
+ return QueryProcessor.prepare(String.format(stmt, keyspace + "." + table), clientState, false).statementId;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[2/3] cassandra git commit: Properly evict pstmts from prepared
statements cache
Posted by sn...@apache.org.
Properly evict pstmts from prepared statements cache
patch by Robert Stupp; reviewed by Benjamin Lerer for CASSANDRA-13641
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9562b9b6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9562b9b6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9562b9b6
Branch: refs/heads/trunk
Commit: 9562b9b69e08b84ec1e8e431a846548fa8a83b44
Parents: bb7e522
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed Jun 28 21:15:03 2017 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Jun 28 21:15:03 2017 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/cql3/QueryProcessor.java | 9 +-
.../org/apache/cassandra/db/SystemKeyspace.java | 6 ++
test/conf/cassandra.yaml | 1 +
.../cassandra/cql3/PstmtPersistenceTest.java | 108 ++++++++++++++-----
5 files changed, 100 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9562b9b6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4297a15..88aa1ef 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.11.1
+ * Properly evict pstmts from prepared statements cache (CASSANDRA-13641)
Merged from 3.0:
* Fix secondary index queries on COMPACT tables (CASSANDRA-13627)
* Nodetool listsnapshots output is missing a newline, if there are no snapshots (CASSANDRA-13568)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9562b9b6/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index f5ce7e4..0e0ba3c 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -88,6 +88,7 @@ public class QueryProcessor implements QueryHandler
.listener((md5Digest, prepared) -> {
metrics.preparedStatementsEvicted.inc();
lastMinuteEvictionsCount.incrementAndGet();
+ SystemKeyspace.removePreparedStatement(md5Digest);
}).build();
thriftPreparedStatements = new ConcurrentLinkedHashMap.Builder<Integer, ParsedStatement.Prepared>()
@@ -162,11 +163,17 @@ public class QueryProcessor implements QueryHandler
logger.info("Preloaded {} prepared statements", count);
}
+ /**
+ * Clears the prepared statement cache.
+ * @param memoryOnly {@code true} if only the in memory caches must be cleared, {@code false} otherwise.
+ */
@VisibleForTesting
- public static void clearPrepraredStatements()
+ public static void clearPreparedStatements(boolean memoryOnly)
{
preparedStatements.clear();
thriftPreparedStatements.clear();
+ if (!memoryOnly)
+ SystemKeyspace.resetPreparedStatements();
}
private static QueryState internalQueryState()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9562b9b6/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 82c9752..6c45329 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -1488,6 +1488,12 @@ public final class SystemKeyspace
key.byteBuffer());
}
+ public static void resetPreparedStatements()
+ {
+ ColumnFamilyStore availableRanges = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(PREPARED_STATEMENTS);
+ availableRanges.truncateBlocking();
+ }
+
public static List<Pair<String, String>> loadPreparedStatements()
{
String query = String.format("SELECT logged_keyspace, query_string FROM %s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, PREPARED_STATEMENTS);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9562b9b6/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index cf02634..96ca9a0 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -44,3 +44,4 @@ row_cache_class_name: org.apache.cassandra.cache.OHCProvider
row_cache_size_in_mb: 16
enable_user_defined_functions: true
enable_scripted_user_defined_functions: true
+prepared_statements_cache_size_mb: 1
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9562b9b6/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
index 380dbda..e7adc8e 100644
--- a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.junit.Before;
import org.junit.Test;
import junit.framework.Assert;
@@ -36,16 +37,23 @@ import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.MD5Digest;
+import static org.junit.Assert.*;
+
public class PstmtPersistenceTest extends CQLTester
{
+ @Before
+ public void setUp()
+ {
+ QueryProcessor.clearPreparedStatements(false);
+ }
+
@Test
public void testCachedPreparedStatements() throws Throwable
{
// need this for pstmt execution/validation tests
requireNetwork();
- int rows = QueryProcessor.executeOnceInternal("SELECT * FROM " + SchemaConstants.SYSTEM_KEYSPACE_NAME + '.' + SystemKeyspace.PREPARED_STATEMENTS).size();
- Assert.assertEquals(0, rows);
+ assertEquals(0, numberOfStatementsOnDisk());
execute("CREATE KEYSPACE IF NOT EXISTS foo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}");
execute("CREATE TABLE foo.bar (key text PRIMARY KEY, val int)");
@@ -56,30 +64,27 @@ public class PstmtPersistenceTest extends CQLTester
List<MD5Digest> stmtIds = new ArrayList<>();
// #0
- stmtIds.add(QueryProcessor.prepare("SELECT * FROM " + SchemaConstants.SCHEMA_KEYSPACE_NAME + '.' + SchemaKeyspace.TABLES + " WHERE keyspace_name = ?", clientState, false).statementId);
+ stmtIds.add(prepareStatement("SELECT * FROM %s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.TABLES, clientState));
// #1
- stmtIds.add(QueryProcessor.prepare("SELECT * FROM " + KEYSPACE + '.' + currentTable() + " WHERE pk = ?", clientState, false).statementId);
+ stmtIds.add(prepareStatement("SELECT * FROM %s WHERE pk = ?", clientState));
// #2
- stmtIds.add(QueryProcessor.prepare("SELECT * FROM foo.bar WHERE key = ?", clientState, false).statementId);
+ stmtIds.add(prepareStatement("SELECT * FROM %s WHERE key = ?", "foo", "bar", clientState));
clientState.setKeyspace("foo");
// #3
- stmtIds.add(QueryProcessor.prepare("SELECT * FROM " + KEYSPACE + '.' + currentTable() + " WHERE pk = ?", clientState, false).statementId);
+ stmtIds.add(prepareStatement("SELECT * FROM %s WHERE pk = ?", clientState));
// #4
- stmtIds.add(QueryProcessor.prepare("SELECT * FROM foo.bar WHERE key = ?", clientState, false).statementId);
+ stmtIds.add(prepareStatement("SELECT * FROM %S WHERE key = ?", "foo", "bar", clientState));
- Assert.assertEquals(5, stmtIds.size());
- Assert.assertEquals(5, QueryProcessor.preparedStatementsCount());
-
- String queryAll = "SELECT * FROM " + SchemaConstants.SYSTEM_KEYSPACE_NAME + '.' + SystemKeyspace.PREPARED_STATEMENTS;
+ assertEquals(5, stmtIds.size());
+ assertEquals(5, QueryProcessor.preparedStatementsCount());
- rows = QueryProcessor.executeOnceInternal(queryAll).size();
- Assert.assertEquals(5, rows);
+ Assert.assertEquals(5, numberOfStatementsOnDisk());
QueryHandler handler = ClientState.getCQLQueryHandler();
validatePstmts(stmtIds, handler);
// clear prepared statements cache
- QueryProcessor.clearPrepraredStatements();
+ QueryProcessor.clearPreparedStatements(true);
Assert.assertEquals(0, QueryProcessor.preparedStatementsCount());
for (MD5Digest stmtId : stmtIds)
Assert.assertNull(handler.getPrepared(stmtId));
@@ -88,7 +93,9 @@ public class PstmtPersistenceTest extends CQLTester
QueryProcessor.preloadPreparedStatement();
validatePstmts(stmtIds, handler);
+
// validate that the prepared statements are in the system table
+ String queryAll = "SELECT * FROM " + SchemaConstants.SYSTEM_KEYSPACE_NAME + '.' + SystemKeyspace.PREPARED_STATEMENTS;
for (UntypedResultSet.Row row : QueryProcessor.executeOnceInternal(queryAll))
{
MD5Digest digest = MD5Digest.wrap(ByteBufferUtil.getArray(row.getBytes("prepared_id")));
@@ -97,22 +104,19 @@ public class PstmtPersistenceTest extends CQLTester
}
// add anther prepared statement and sync it to table
- QueryProcessor.prepare("SELECT * FROM bar WHERE key = ?", clientState, false);
- Assert.assertEquals(6, QueryProcessor.preparedStatementsCount());
- rows = QueryProcessor.executeOnceInternal(queryAll).size();
- Assert.assertEquals(6, rows);
+ prepareStatement("SELECT * FROM %s WHERE key = ?", "foo", "bar", clientState);
+ assertEquals(6, numberOfStatementsInMemory());
+ assertEquals(6, numberOfStatementsOnDisk());
// drop a keyspace (prepared statements are removed - syncPreparedStatements() remove should the rows, too)
execute("DROP KEYSPACE foo");
- Assert.assertEquals(3, QueryProcessor.preparedStatementsCount());
- rows = QueryProcessor.executeOnceInternal(queryAll).size();
- Assert.assertEquals(3, rows);
-
+ assertEquals(3, numberOfStatementsInMemory());
+ assertEquals(3, numberOfStatementsOnDisk());
}
private void validatePstmts(List<MD5Digest> stmtIds, QueryHandler handler)
{
- Assert.assertEquals(5, QueryProcessor.preparedStatementsCount());
+ assertEquals(5, QueryProcessor.preparedStatementsCount());
QueryOptions optionsStr = QueryOptions.forInternalCalls(Collections.singletonList(UTF8Type.instance.fromString("foobar")));
QueryOptions optionsInt = QueryOptions.forInternalCalls(Collections.singletonList(Int32Type.instance.decompose(42)));
validatePstmt(handler, stmtIds.get(0), optionsStr);
@@ -125,7 +129,63 @@ public class PstmtPersistenceTest extends CQLTester
private static void validatePstmt(QueryHandler handler, MD5Digest stmtId, QueryOptions options)
{
ParsedStatement.Prepared prepared = handler.getPrepared(stmtId);
- Assert.assertNotNull(prepared);
+ assertNotNull(prepared);
handler.processPrepared(prepared.statement, QueryState.forInternalCalls(), options, Collections.emptyMap(), System.nanoTime());
}
+
+ @Test
+ public void testPstmtInvalidation() throws Throwable
+ {
+ ClientState clientState = ClientState.forInternalCalls();
+
+ createTable("CREATE TABLE %s (key int primary key, val int)");
+
+ for (int cnt = 1; cnt < 10000; cnt++)
+ {
+ prepareStatement("INSERT INTO %s (key, val) VALUES (?, ?) USING TIMESTAMP " + cnt, clientState);
+
+ if (numberOfEvictedStatements() > 0)
+ {
+ assertEquals("Number of statements in table and in cache don't match", numberOfStatementsInMemory(), numberOfStatementsOnDisk());
+
+ // prepare a more statements to trigger more evictions
+ for (int cnt2 = 1; cnt2 < 10; cnt2++)
+ prepareStatement("INSERT INTO %s (key, val) VALUES (?, ?) USING TIMESTAMP " + cnt2, clientState);
+
+ // each new prepared statement should have caused an eviction
+ assertEquals("eviction count didn't increase by the expected number", numberOfEvictedStatements(), 10);
+ assertEquals("Number of statements in table and in cache don't match", numberOfStatementsInMemory(), numberOfStatementsOnDisk());
+
+ return;
+ }
+ }
+
+ fail("Prepared statement eviction does not work");
+ }
+
+ private long numberOfStatementsOnDisk() throws Throwable
+ {
+ UntypedResultSet.Row row = execute("SELECT COUNT(*) FROM " + SchemaConstants.SYSTEM_KEYSPACE_NAME + '.' + SystemKeyspace.PREPARED_STATEMENTS).one();
+ return row.getLong("count");
+ }
+
+ private long numberOfStatementsInMemory()
+ {
+ return QueryProcessor.preparedStatementsCount();
+ }
+
+ private long numberOfEvictedStatements()
+ {
+ return QueryProcessor.metrics.preparedStatementsEvicted.getCount();
+ }
+
+ private MD5Digest prepareStatement(String stmt, ClientState clientState)
+ {
+ return prepareStatement(stmt, keyspace(), currentTable(), clientState);
+ }
+
+ private MD5Digest prepareStatement(String stmt, String keyspace, String table, ClientState clientState)
+ {
+ return QueryProcessor.prepare(String.format(stmt, keyspace + "." + table), clientState, false).statementId;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[3/3] cassandra git commit: Merge branch 'cassandra-3.11' into trunk
Posted by sn...@apache.org.
Merge branch 'cassandra-3.11' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9c6f87c3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9c6f87c3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9c6f87c3
Branch: refs/heads/trunk
Commit: 9c6f87c35f364ec6a88775cb3d0cf143e36635e7
Parents: 26e0258 9562b9b
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed Jun 28 21:17:06 2017 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Jun 28 21:17:06 2017 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/cql3/QueryProcessor.java | 20 +++-
.../org/apache/cassandra/db/SystemKeyspace.java | 6 +
test/conf/cassandra.yaml | 1 +
.../cassandra/cql3/PstmtPersistenceTest.java | 110 ++++++++++++++-----
5 files changed, 107 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c6f87c3/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 04640ab,88aa1ef..6ffd11a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,90 -1,8 +1,91 @@@
+4.0
+ * Improve secondary index (re)build failure and concurrency handling (CASSANDRA-10130)
+ * Improve calculation of available disk space for compaction (CASSANDRA-13068)
+ * Change the accessibility of RowCacheSerializer for third party row cache plugins (CASSANDRA-13579)
+ * Allow sub-range repairs for a preview of repaired data (CASSANDRA-13570)
+ * NPE in IR cleanup when columnfamily has no sstables (CASSANDRA-13585)
+ * Fix Randomness of stress values (CASSANDRA-12744)
+ * Allow selecting Map values and Set elements (CASSANDRA-7396)
+ * Fast and garbage-free Streaming Histogram (CASSANDRA-13444)
+ * Update repairTime for keyspaces on completion (CASSANDRA-13539)
+ * Add configurable upper bound for validation executor threads (CASSANDRA-13521)
+ * Bring back maxHintTTL propery (CASSANDRA-12982)
+ * Add testing guidelines (CASSANDRA-13497)
+ * Add more repair metrics (CASSANDRA-13531)
+ * RangeStreamer should be smarter when picking endpoints for streaming (CASSANDRA-4650)
+ * Avoid rewrapping an exception thrown for cache load functions (CASSANDRA-13367)
+ * Log time elapsed for each incremental repair phase (CASSANDRA-13498)
+ * Add multiple table operation support to cassandra-stress (CASSANDRA-8780)
+ * Fix incorrect cqlsh results when selecting same columns multiple times (CASSANDRA-13262)
+ * Fix WriteResponseHandlerTest is sensitive to test execution order (CASSANDRA-13421)
+ * Improve incremental repair logging (CASSANDRA-13468)
+ * Start compaction when incremental repair finishes (CASSANDRA-13454)
+ * Add repair streaming preview (CASSANDRA-13257)
+ * Cleanup isIncremental/repairedAt usage (CASSANDRA-13430)
+ * Change protocol to allow sending key space independent of query string (CASSANDRA-10145)
+ * Make gc_log and gc_warn settable at runtime (CASSANDRA-12661)
+ * Take number of files in L0 in account when estimating remaining compaction tasks (CASSANDRA-13354)
+ * Skip building views during base table streams on range movements (CASSANDRA-13065)
+ * Improve error messages for +/- operations on maps and tuples (CASSANDRA-13197)
+ * Remove deprecated repair JMX APIs (CASSANDRA-11530)
+ * Fix version check to enable streaming keep-alive (CASSANDRA-12929)
+ * Make it possible to monitor an ideal consistency level separate from actual consistency level (CASSANDRA-13289)
+ * Outbound TCP connections ignore internode authenticator (CASSANDRA-13324)
+ * Upgrade junit from 4.6 to 4.12 (CASSANDRA-13360)
+ * Cleanup ParentRepairSession after repairs (CASSANDRA-13359)
+ * Upgrade snappy-java to 1.1.2.6 (CASSANDRA-13336)
+ * Incremental repair not streaming correct sstables (CASSANDRA-13328)
+ * Upgrade the jna version to 4.3.0 (CASSANDRA-13300)
+ * Add the currentTimestamp, currentDate, currentTime and currentTimeUUID functions (CASSANDRA-13132)
+ * Remove config option index_interval (CASSANDRA-10671)
+ * Reduce lock contention for collection types and serializers (CASSANDRA-13271)
+ * Make it possible to override MessagingService.Verb ids (CASSANDRA-13283)
+ * Avoid synchronized on prepareForRepair in ActiveRepairService (CASSANDRA-9292)
+ * Adds the ability to use uncompressed chunks in compressed files (CASSANDRA-10520)
+ * Don't flush sstables when streaming for incremental repair (CASSANDRA-13226)
+ * Remove unused method (CASSANDRA-13227)
+ * Fix minor bugs related to #9143 (CASSANDRA-13217)
+ * Output warning if user increases RF (CASSANDRA-13079)
+ * Remove pre-3.0 streaming compatibility code for 4.0 (CASSANDRA-13081)
+ * Add support for + and - operations on dates (CASSANDRA-11936)
+ * Fix consistency of incrementally repaired data (CASSANDRA-9143)
+ * Increase commitlog version (CASSANDRA-13161)
+ * Make TableMetadata immutable, optimize Schema (CASSANDRA-9425)
+ * Refactor ColumnCondition (CASSANDRA-12981)
+ * Parallelize streaming of different keyspaces (CASSANDRA-4663)
+ * Improved compactions metrics (CASSANDRA-13015)
+ * Speed-up start-up sequence by avoiding un-needed flushes (CASSANDRA-13031)
+ * Use Caffeine (W-TinyLFU) for on-heap caches (CASSANDRA-10855)
+ * Thrift removal (CASSANDRA-11115)
+ * Remove pre-3.0 compatibility code for 4.0 (CASSANDRA-12716)
+ * Add column definition kind to dropped columns in schema (CASSANDRA-12705)
+ * Add (automate) Nodetool Documentation (CASSANDRA-12672)
+ * Update bundled cqlsh python driver to 3.7.0 (CASSANDRA-12736)
+ * Reject invalid replication settings when creating or altering a keyspace (CASSANDRA-12681)
+ * Clean up the SSTableReader#getScanner API wrt removal of RateLimiter (CASSANDRA-12422)
+ * Use new token allocation for non bootstrap case as well (CASSANDRA-13080)
+ * Avoid byte-array copy when key cache is disabled (CASSANDRA-13084)
+ * Require forceful decommission if number of nodes is less than replication factor (CASSANDRA-12510)
+ * Allow IN restrictions on column families with collections (CASSANDRA-12654)
+ * Log message size in trace message in OutboundTcpConnection (CASSANDRA-13028)
+ * Add timeUnit Days for cassandra-stress (CASSANDRA-13029)
+ * Add mutation size and batch metrics (CASSANDRA-12649)
+ * Add method to get size of endpoints to TokenMetadata (CASSANDRA-12999)
+ * Expose time spent waiting in thread pool queue (CASSANDRA-8398)
+ * Conditionally update index built status to avoid unnecessary flushes (CASSANDRA-12969)
+ * cqlsh auto completion: refactor definition of compaction strategy options (CASSANDRA-12946)
+ * Add support for arithmetic operators (CASSANDRA-11935)
+ * Add histogram for delay to deliver hints (CASSANDRA-13234)
+ * Fix cqlsh automatic protocol downgrade regression (CASSANDRA-13307)
+ * Changing `max_hint_window_in_ms` at runtime (CASSANDRA-11720)
+ * Trivial format error in StorageProxy (CASSANDRA-13551)
+
+
3.11.1
+ * Properly evict pstmts from prepared statements cache (CASSANDRA-13641)
Merged from 3.0:
- * Fix secondary index queries on COMPACT tables (CASSANDRA-13627)
- * Nodetool listsnapshots output is missing a newline, if there are no snapshots (CASSANDRA-13568)
+ * Fix secondary index queries on COMPACT tables (CASSANDRA-13627)
+ * Nodetool listsnapshots output is missing a newline, if there are no snapshots (CASSANDRA-13568)
3.11.0
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c6f87c3/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/QueryProcessor.java
index cca93ff,0e0ba3c..ade98e7
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@@ -24,9 -24,6 +24,8 @@@ import java.util.concurrent.ConcurrentM
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
- import com.github.benmanes.caffeine.cache.RemovalCause;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
@@@ -83,17 -81,25 +82,20 @@@ public class QueryProcessor implements
static
{
- preparedStatements = new ConcurrentLinkedHashMap.Builder<MD5Digest, ParsedStatement.Prepared>()
- .maximumWeightedCapacity(capacityToBytes(DatabaseDescriptor.getPreparedStatementsCacheSizeMB()))
+ preparedStatements = Caffeine.newBuilder()
+ .executor(MoreExecutors.directExecutor())
+ .maximumWeight(capacityToBytes(DatabaseDescriptor.getPreparedStatementsCacheSizeMB()))
.weigher(QueryProcessor::measure)
- .removalListener((md5Digest, prepared, cause) -> {
- if (cause == RemovalCause.SIZE) {
- .listener((md5Digest, prepared) -> {
- metrics.preparedStatementsEvicted.inc();
- lastMinuteEvictionsCount.incrementAndGet();
- SystemKeyspace.removePreparedStatement(md5Digest);
++ .removalListener((key, prepared, cause) -> {
++ MD5Digest md5Digest = (MD5Digest) key;
++ if (cause.wasEvicted())
++ {
+ metrics.preparedStatementsEvicted.inc();
- lastMinuteEvictionsCount.incrementAndGet();
++ lastMinuteEvictionsCount.incrementAndGet();
++ SystemKeyspace.removePreparedStatement(md5Digest);
+ }
}).build();
- thriftPreparedStatements = new ConcurrentLinkedHashMap.Builder<Integer, ParsedStatement.Prepared>()
- .maximumWeightedCapacity(capacityToBytes(DatabaseDescriptor.getThriftPreparedStatementsCacheSizeMB()))
- .weigher(QueryProcessor::measure)
- .listener((integer, prepared) -> {
- metrics.preparedStatementsEvicted.inc();
- thriftLastMinuteEvictionsCount.incrementAndGet();
- })
- .build();
-
ScheduledExecutors.scheduledTasks.scheduleAtFixedRate(() -> {
long count = lastMinuteEvictionsCount.getAndSet(0);
if (count > 0)
@@@ -151,10 -163,17 +153,16 @@@
logger.info("Preloaded {} prepared statements", count);
}
+ /**
+ * Clears the prepared statement cache.
+ * @param memoryOnly {@code true} if only the in memory caches must be cleared, {@code false} otherwise.
+ */
@VisibleForTesting
- public static void clearPrepraredStatements()
+ public static void clearPreparedStatements(boolean memoryOnly)
{
- preparedStatements.invalidateAll();;
- preparedStatements.clear();
- thriftPreparedStatements.clear();
++ preparedStatements.invalidateAll();
+ if (!memoryOnly)
+ SystemKeyspace.resetPreparedStatements();
}
private static QueryState internalQueryState()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c6f87c3/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java
index 0d64a94,6c45329..6b278a5
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@@ -1344,9 -1488,15 +1344,15 @@@ public final class SystemKeyspac
key.byteBuffer());
}
+ public static void resetPreparedStatements()
+ {
+ ColumnFamilyStore availableRanges = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(PREPARED_STATEMENTS);
+ availableRanges.truncateBlocking();
+ }
+
public static List<Pair<String, String>> loadPreparedStatements()
{
- String query = String.format("SELECT logged_keyspace, query_string FROM %s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, PREPARED_STATEMENTS);
+ String query = format("SELECT logged_keyspace, query_string FROM %s", PreparedStatements.toString());
UntypedResultSet resultSet = executeOnceInternal(query);
List<Pair<String, String>> r = new ArrayList<>();
for (UntypedResultSet.Row row : resultSet)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c6f87c3/test/conf/cassandra.yaml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c6f87c3/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
index fb577ec,e7adc8e..228352c
--- a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
@@@ -25,11 -26,11 +26,11 @@@ import org.junit.Before
import org.junit.Test;
import junit.framework.Assert;
- import org.apache.cassandra.schema.SchemaConstants;
-import org.apache.cassandra.config.SchemaConstants;
import org.apache.cassandra.cql3.statements.ParsedStatement;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.UTF8Type;
++import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.SchemaKeyspace;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
@@@ -125,7 -129,63 +129,63 @@@ public class PstmtPersistenceTest exten
private static void validatePstmt(QueryHandler handler, MD5Digest stmtId, QueryOptions options)
{
ParsedStatement.Prepared prepared = handler.getPrepared(stmtId);
- Assert.assertNotNull(prepared);
+ assertNotNull(prepared);
handler.processPrepared(prepared.statement, QueryState.forInternalCalls(), options, Collections.emptyMap(), System.nanoTime());
}
+
+ @Test
+ public void testPstmtInvalidation() throws Throwable
+ {
+ ClientState clientState = ClientState.forInternalCalls();
+
+ createTable("CREATE TABLE %s (key int primary key, val int)");
+
+ for (int cnt = 1; cnt < 10000; cnt++)
+ {
+ prepareStatement("INSERT INTO %s (key, val) VALUES (?, ?) USING TIMESTAMP " + cnt, clientState);
+
+ if (numberOfEvictedStatements() > 0)
+ {
+ assertEquals("Number of statements in table and in cache don't match", numberOfStatementsInMemory(), numberOfStatementsOnDisk());
+
+ // prepare a more statements to trigger more evictions
+ for (int cnt2 = 1; cnt2 < 10; cnt2++)
+ prepareStatement("INSERT INTO %s (key, val) VALUES (?, ?) USING TIMESTAMP " + cnt2, clientState);
+
+ // each new prepared statement should have caused an eviction
+ assertEquals("eviction count didn't increase by the expected number", numberOfEvictedStatements(), 10);
+ assertEquals("Number of statements in table and in cache don't match", numberOfStatementsInMemory(), numberOfStatementsOnDisk());
+
+ return;
+ }
+ }
+
+ fail("Prepared statement eviction does not work");
+ }
+
+ private long numberOfStatementsOnDisk() throws Throwable
+ {
+ UntypedResultSet.Row row = execute("SELECT COUNT(*) FROM " + SchemaConstants.SYSTEM_KEYSPACE_NAME + '.' + SystemKeyspace.PREPARED_STATEMENTS).one();
+ return row.getLong("count");
+ }
+
+ private long numberOfStatementsInMemory()
+ {
+ return QueryProcessor.preparedStatementsCount();
+ }
+
+ private long numberOfEvictedStatements()
+ {
+ return QueryProcessor.metrics.preparedStatementsEvicted.getCount();
+ }
+
+ private MD5Digest prepareStatement(String stmt, ClientState clientState)
+ {
+ return prepareStatement(stmt, keyspace(), currentTable(), clientState);
+ }
+
+ private MD5Digest prepareStatement(String stmt, String keyspace, String table, ClientState clientState)
+ {
- return QueryProcessor.prepare(String.format(stmt, keyspace + "." + table), clientState, false).statementId;
++ return QueryProcessor.prepare(String.format(stmt, keyspace + "." + table), clientState).statementId;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org