You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/09/14 15:37:41 UTC
nifi git commit: NIFI-3425: Provide ability to cache CQL statements
Repository: nifi
Updated Branches:
refs/heads/master 14729be83 -> 4a25402c1
NIFI-3425: Provide ability to cache CQL statements
This closes #2986.
Co-authored-by: Michael A Giroux <ma...@rd6ul-92373g.infosec.tycho.ncsc.mil>
Signed-off-by: Mark Payne <ma...@hotmail.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4a25402c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4a25402c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4a25402c
Branch: refs/heads/master
Commit: 4a25402c1af7e51b1a7bf61f185dc58b5c40e90b
Parents: 14729be
Author: Matthew Burgess <ma...@apache.org>
Authored: Tue Sep 4 12:51:52 2018 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Sep 14 11:37:18 2018 -0400
----------------------------------------------------------------------
.../processors/cassandra/PutCassandraQL.java | 49 +++++++++++++++----
.../cassandra/PutCassandraQLTest.java | 50 ++++++++++++++++++--
2 files changed, 86 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/4a25402c/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
index ccedb1b..d6f13de 100644
--- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
@@ -27,6 +27,8 @@ import com.datastax.driver.core.exceptions.InvalidTypeException;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.exceptions.QueryExecutionException;
import com.datastax.driver.core.exceptions.QueryValidationException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -37,8 +39,7 @@ import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnShutdown;
-import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
@@ -63,6 +64,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
@@ -94,6 +96,7 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
public static final PropertyDescriptor STATEMENT_TIMEOUT = new PropertyDescriptor.Builder()
.name("Max Wait Time")
+ .displayName("Max Wait Time")
.description("The maximum amount of time allowed for a running CQL select query. Must be of format "
+ "<duration> <TimeUnit> where <duration> is a non-negative integer and TimeUnit is a supported "
+ "Time Unit, such as: nanos, millis, secs, mins, hrs, days. A value of zero means there is no limit. ")
@@ -103,6 +106,17 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
+ public static final PropertyDescriptor STATEMENT_CACHE_SIZE = new PropertyDescriptor.Builder()
+ .name("putcql-stmt-cache-size")
+ .displayName("Statement Cache Size")
+ .description("The maximum number of CQL Prepared Statements to cache. This can improve performance if many incoming flow files have the same CQL statement "
+ + "with different values for the parameters. If this property is set to zero, the cache is effectively disabled.")
+ .defaultValue("0")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .build();
+
private final static List<PropertyDescriptor> propertyDescriptors;
// Relationships
@@ -126,6 +140,12 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
// Matches on top-level type (primitive types like text,int) and also for collections (like list<boolean> and map<float,double>)
private static final Pattern CQL_TYPE_PATTERN = Pattern.compile("([^<]+)(<([^,>]+)(,([^,>]+))*>)?");
+ /**
+ * LRU cache for the compiled patterns. The size of the cache is determined by the value of the Statement Cache Size property
+ */
+ @VisibleForTesting
+ private ConcurrentMap<String, PreparedStatement> statementCache;
+
/*
* Will ensure that the list of property descriptors is build only once.
* Will also create a Set of relationships
@@ -134,6 +154,7 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.addAll(descriptors);
_propertyDescriptors.add(STATEMENT_TIMEOUT);
+ _propertyDescriptors.add(STATEMENT_CACHE_SIZE);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
Set<Relationship> _relationships = new HashSet<>();
@@ -157,8 +178,17 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
@OnScheduled
public void onScheduled(final ProcessContext context) {
ComponentLog log = getLogger();
+
+ // Initialize the prepared statement cache
+ int statementCacheSize = context.getProperty(STATEMENT_CACHE_SIZE).evaluateAttributeExpressions().asInteger();
+ statementCache = CacheBuilder.newBuilder()
+ .maximumSize(statementCacheSize)
+ .<String, PreparedStatement>build()
+ .asMap();
+
try {
connectToCassandra(context);
+
} catch (final NoHostAvailableException nhae) {
log.error("No host in the Cassandra cluster can be contacted successfully to execute this statement", nhae);
// Log up to 10 error messages. Otherwise if a 1000-node cluster was specified but there was no connectivity,
@@ -191,7 +221,11 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
String cql = getCQL(session, flowFile, charset);
try {
- PreparedStatement statement = connectionSession.prepare(cql);
+ PreparedStatement statement = statementCache.get(cql);
+ if(statement == null) {
+ statement = connectionSession.prepare(cql);
+ statementCache.put(cql, statement);
+ }
BoundStatement boundStatement = statement.bind();
Map<String, String> attributes = flowFile.getAttributes();
@@ -396,14 +430,9 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
throw new IllegalArgumentException("Cannot create object of type " + paramType + " using input " + paramValue);
}
- @OnUnscheduled
+ @OnStopped
public void stop() {
super.stop();
+ statementCache.clear();
}
-
- @OnShutdown
- public void shutdown() {
- super.stop();
- }
-
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/4a25402c/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
index de66235..60c3cda 100644
--- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
@@ -60,7 +60,7 @@ public class PutCassandraQLTest {
private MockPutCassandraQL processor;
@Before
- public void setUp() throws Exception {
+ public void setUp() {
processor = new MockPutCassandraQL();
testRunner = TestRunners.newTestRunner(processor);
}
@@ -172,6 +172,50 @@ public class PutCassandraQLTest {
}
@Test
+ public void testMultipleQuery() {
+ setUpStandardTestConfig();
+ testRunner.setProperty(PutCassandraQL.STATEMENT_CACHE_SIZE, "1");
+
+ HashMap<String, String> testData = new HashMap<>();
+ testData.put("cql.args.1.type", "int");
+ testData.put("cql.args.1.value", "1");
+ testData.put("cql.args.2.type", "text");
+ testData.put("cql.args.2.value", "Joe");
+ testData.put("cql.args.3.type", "text");
+ // No value for arg 3 to test setNull
+ testData.put("cql.args.4.type", "map<text,text>");
+ testData.put("cql.args.4.value", "{'a':'Hello', 'b':'World'}");
+ testData.put("cql.args.5.type", "list<boolean>");
+ testData.put("cql.args.5.value", "[true,false,true]");
+ testData.put("cql.args.6.type", "set<double>");
+ testData.put("cql.args.6.value", "{1.0, 2.0}");
+ testData.put("cql.args.7.type", "bigint");
+ testData.put("cql.args.7.value", "20000000");
+ testData.put("cql.args.8.type", "float");
+ testData.put("cql.args.8.value", "1.0");
+ testData.put("cql.args.9.type", "blob");
+ testData.put("cql.args.9.value", "0xDEADBEEF");
+ testData.put("cql.args.10.type", "timestamp");
+ testData.put("cql.args.10.value", "2016-07-01T15:21:05Z");
+
+ testRunner.enqueue("INSERT INTO users (user_id, first_name, last_name, properties, bits, scaleset, largenum, scale, byteobject, ts) VALUES ?, ?, ?, ?, ?, ?, ?, ?, ?, ?",
+ testData);
+
+ testRunner.enqueue("INSERT INTO newusers (user_id, first_name, last_name, properties, bits, scaleset, largenum, scale, byteobject, ts) VALUES ?, ?, ?, ?, ?, ?, ?, ?, ?, ?",
+ testData);
+
+ // Change it up a bit, the same statement is executed with different data
+ testData.put("cql.args.1.value", "2");
+ testRunner.enqueue("INSERT INTO users (user_id, first_name, last_name, properties, bits, scaleset, largenum, scale, byteobject, ts) VALUES ?, ?, ?, ?, ?, ?, ?, ?, ?, ?",
+ testData);
+
+ testRunner.enqueue("INSERT INTO users (user_id) VALUES ('user_id data');");
+
+ testRunner.run(4, true, true);
+ testRunner.assertAllFlowFilesTransferred(PutCassandraQL.REL_SUCCESS, 4);
+ }
+
+ @Test
public void testProcessorBadTimestamp() {
setUpStandardTestConfig();
processor.setExceptionToThrow(
@@ -236,7 +280,7 @@ public class PutCassandraQLTest {
public void testProcessorNoHostAvailableException() {
setUpStandardTestConfig();
- processor.setExceptionToThrow(new NoHostAvailableException(new HashMap<InetSocketAddress, Throwable>()));
+ processor.setExceptionToThrow(new NoHostAvailableException(new HashMap<>()));
testRunner.enqueue("UPDATE users SET cities = [ 'New York', 'Los Angeles' ] WHERE user_id = 'coast2coast';");
testRunner.run(1, true, true);
testRunner.assertAllFlowFilesTransferred(PutCassandraQL.REL_RETRY, 1);
@@ -307,7 +351,7 @@ public class PutCassandraQLTest {
return mockCluster;
}
- public void setExceptionToThrow(Exception e) {
+ void setExceptionToThrow(Exception e) {
exceptionToThrow = e;
doThrow(exceptionToThrow).when(mockSession).executeAsync(anyString());
doThrow(exceptionToThrow).when(mockSession).executeAsync(any(Statement.class));