You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/09/14 15:37:41 UTC

nifi git commit: NIFI-3425: Provide ability to cache CQL statements

Repository: nifi
Updated Branches:
  refs/heads/master 14729be83 -> 4a25402c1


NIFI-3425: Provide ability to cache CQL statements

This closes #2986.

Co-authored-by: Michael A Giroux <ma...@rd6ul-92373g.infosec.tycho.ncsc.mil>
Signed-off-by: Mark Payne <ma...@hotmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4a25402c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4a25402c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4a25402c

Branch: refs/heads/master
Commit: 4a25402c1af7e51b1a7bf61f185dc58b5c40e90b
Parents: 14729be
Author: Matthew Burgess <ma...@apache.org>
Authored: Tue Sep 4 12:51:52 2018 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Sep 14 11:37:18 2018 -0400

----------------------------------------------------------------------
 .../processors/cassandra/PutCassandraQL.java    | 49 +++++++++++++++----
 .../cassandra/PutCassandraQLTest.java           | 50 ++++++++++++++++++--
 2 files changed, 86 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/4a25402c/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
index ccedb1b..d6f13de 100644
--- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
@@ -27,6 +27,8 @@ import com.datastax.driver.core.exceptions.InvalidTypeException;
 import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import com.datastax.driver.core.exceptions.QueryExecutionException;
 import com.datastax.driver.core.exceptions.QueryValidationException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -37,8 +39,7 @@ import org.apache.nifi.annotation.behavior.SystemResource;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnShutdown;
-import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
@@ -63,6 +64,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.regex.Matcher;
@@ -94,6 +96,7 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
 
     public static final PropertyDescriptor STATEMENT_TIMEOUT = new PropertyDescriptor.Builder()
             .name("Max Wait Time")
+            .displayName("Max Wait Time")
             .description("The maximum amount of time allowed for a running CQL select query. Must be of format "
                     + "<duration> <TimeUnit> where <duration> is a non-negative integer and TimeUnit is a supported "
                     + "Time Unit, such as: nanos, millis, secs, mins, hrs, days. A value of zero means there is no limit. ")
@@ -103,6 +106,17 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor STATEMENT_CACHE_SIZE = new PropertyDescriptor.Builder()
+            .name("putcql-stmt-cache-size")
+            .displayName("Statement Cache Size")
+            .description("The maximum number of CQL Prepared Statements to cache. This can improve performance if many incoming flow files have the same CQL statement "
+                    + "with different values for the parameters. If this property is set to zero, the cache is effectively disabled.")
+            .defaultValue("0")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+
     private final static List<PropertyDescriptor> propertyDescriptors;
 
     // Relationships
@@ -126,6 +140,12 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
     // Matches on top-level type (primitive types like text,int) and also for collections (like list<boolean> and map<float,double>)
     private static final Pattern CQL_TYPE_PATTERN = Pattern.compile("([^<]+)(<([^,>]+)(,([^,>]+))*>)?");
 
+    /**
+     * LRU cache for the compiled patterns. The size of the cache is determined by the value of the Statement Cache Size property
+     */
+    @VisibleForTesting
+    private ConcurrentMap<String, PreparedStatement> statementCache;
+
     /*
      * Will ensure that the list of property descriptors is build only once.
      * Will also create a Set of relationships
@@ -134,6 +154,7 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
         List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
         _propertyDescriptors.addAll(descriptors);
         _propertyDescriptors.add(STATEMENT_TIMEOUT);
+        _propertyDescriptors.add(STATEMENT_CACHE_SIZE);
         propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
 
         Set<Relationship> _relationships = new HashSet<>();
@@ -157,8 +178,17 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
         ComponentLog log = getLogger();
+
+        // Initialize the prepared statement cache
+        int statementCacheSize = context.getProperty(STATEMENT_CACHE_SIZE).evaluateAttributeExpressions().asInteger();
+        statementCache =  CacheBuilder.newBuilder()
+                .maximumSize(statementCacheSize)
+                .<String, PreparedStatement>build()
+                .asMap();
+
         try {
             connectToCassandra(context);
+
         } catch (final NoHostAvailableException nhae) {
             log.error("No host in the Cassandra cluster can be contacted successfully to execute this statement", nhae);
             // Log up to 10 error messages. Otherwise if a 1000-node cluster was specified but there was no connectivity,
@@ -191,7 +221,11 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
 
         String cql = getCQL(session, flowFile, charset);
         try {
-            PreparedStatement statement = connectionSession.prepare(cql);
+            PreparedStatement statement = statementCache.get(cql);
+            if(statement == null) {
+                statement = connectionSession.prepare(cql);
+                statementCache.put(cql, statement);
+            }
             BoundStatement boundStatement = statement.bind();
 
             Map<String, String> attributes = flowFile.getAttributes();
@@ -396,14 +430,9 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
         throw new IllegalArgumentException("Cannot create object of type " + paramType + " using input " + paramValue);
     }
 
-    @OnUnscheduled
+    @OnStopped
     public void stop() {
         super.stop();
+        statementCache.clear();
     }
-
-    @OnShutdown
-    public void shutdown() {
-        super.stop();
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/4a25402c/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
index de66235..60c3cda 100644
--- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
@@ -60,7 +60,7 @@ public class PutCassandraQLTest {
     private MockPutCassandraQL processor;
 
     @Before
-    public void setUp() throws Exception {
+    public void setUp() {
         processor = new MockPutCassandraQL();
         testRunner = TestRunners.newTestRunner(processor);
     }
@@ -172,6 +172,50 @@ public class PutCassandraQLTest {
     }
 
     @Test
+    public void testMultipleQuery() {
+        setUpStandardTestConfig();
+        testRunner.setProperty(PutCassandraQL.STATEMENT_CACHE_SIZE, "1");
+
+        HashMap<String, String> testData = new HashMap<>();
+        testData.put("cql.args.1.type", "int");
+        testData.put("cql.args.1.value", "1");
+        testData.put("cql.args.2.type", "text");
+        testData.put("cql.args.2.value", "Joe");
+        testData.put("cql.args.3.type", "text");
+        // No value for arg 3 to test setNull
+        testData.put("cql.args.4.type", "map<text,text>");
+        testData.put("cql.args.4.value", "{'a':'Hello', 'b':'World'}");
+        testData.put("cql.args.5.type", "list<boolean>");
+        testData.put("cql.args.5.value", "[true,false,true]");
+        testData.put("cql.args.6.type", "set<double>");
+        testData.put("cql.args.6.value", "{1.0, 2.0}");
+        testData.put("cql.args.7.type", "bigint");
+        testData.put("cql.args.7.value", "20000000");
+        testData.put("cql.args.8.type", "float");
+        testData.put("cql.args.8.value", "1.0");
+        testData.put("cql.args.9.type", "blob");
+        testData.put("cql.args.9.value", "0xDEADBEEF");
+        testData.put("cql.args.10.type", "timestamp");
+        testData.put("cql.args.10.value", "2016-07-01T15:21:05Z");
+
+        testRunner.enqueue("INSERT INTO users (user_id, first_name, last_name, properties, bits, scaleset, largenum, scale, byteobject, ts) VALUES ?, ?, ?, ?, ?, ?, ?, ?, ?, ?",
+                testData);
+
+        testRunner.enqueue("INSERT INTO newusers (user_id, first_name, last_name, properties, bits, scaleset, largenum, scale, byteobject, ts) VALUES ?, ?, ?, ?, ?, ?, ?, ?, ?, ?",
+                testData);
+
+        // Change it up a bit, the same statement is executed with different data
+        testData.put("cql.args.1.value", "2");
+        testRunner.enqueue("INSERT INTO users (user_id, first_name, last_name, properties, bits, scaleset, largenum, scale, byteobject, ts) VALUES ?, ?, ?, ?, ?, ?, ?, ?, ?, ?",
+                testData);
+
+        testRunner.enqueue("INSERT INTO users (user_id) VALUES ('user_id data');");
+
+        testRunner.run(4, true, true);
+        testRunner.assertAllFlowFilesTransferred(PutCassandraQL.REL_SUCCESS, 4);
+    }
+
+    @Test
     public void testProcessorBadTimestamp() {
         setUpStandardTestConfig();
         processor.setExceptionToThrow(
@@ -236,7 +280,7 @@ public class PutCassandraQLTest {
     public void testProcessorNoHostAvailableException() {
         setUpStandardTestConfig();
 
-        processor.setExceptionToThrow(new NoHostAvailableException(new HashMap<InetSocketAddress, Throwable>()));
+        processor.setExceptionToThrow(new NoHostAvailableException(new HashMap<>()));
         testRunner.enqueue("UPDATE users SET cities = [ 'New York', 'Los Angeles' ] WHERE user_id = 'coast2coast';");
         testRunner.run(1, true, true);
         testRunner.assertAllFlowFilesTransferred(PutCassandraQL.REL_RETRY, 1);
@@ -307,7 +351,7 @@ public class PutCassandraQLTest {
             return mockCluster;
         }
 
-        public void setExceptionToThrow(Exception e) {
+        void setExceptionToThrow(Exception e) {
             exceptionToThrow = e;
             doThrow(exceptionToThrow).when(mockSession).executeAsync(anyString());
             doThrow(exceptionToThrow).when(mockSession).executeAsync(any(Statement.class));