You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2022/10/31 18:40:23 UTC

[nifi] branch main updated: NIFI-10729 Added Cassandra testcontainers.

This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 2a45d4ac89 NIFI-10729 Added Cassandra testcontainers.
2a45d4ac89 is described below

commit 2a45d4ac8928194e351faee1067ddce07a155ad0
Author: Mike Thomsen <mt...@apache.org>
AuthorDate: Sat Oct 29 19:40:52 2022 -0400

    NIFI-10729 Added Cassandra testcontainers.
    
    Signed-off-by: Matthew Burgess <ma...@apache.org>
    
    This closes #6604
---
 .../pom.xml                                        | 13 ++++++++++++
 .../cassandra/CassandraDistributedMapCache.java    |  5 +++++
 .../nifi/CassandraDistributedMapCacheIT.groovy     | 24 ++++++++++++++++++++--
 .../nifi-cassandra-processors/pom.xml              | 13 ++++++++++++
 .../processors/cassandra/PutCassandraRecordIT.java | 16 +++++++++++----
 5 files changed, 65 insertions(+), 6 deletions(-)

diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/pom.xml b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/pom.xml
index cbd9125d5f..848d3625fd 100644
--- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/pom.xml
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/pom.xml
@@ -98,5 +98,18 @@
             <artifactId>nifi-ssl-context-service-api</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>cassandra</artifactId>
+            <version>${testcontainers.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>junit-jupiter</artifactId>
+            <version>${testcontainers.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/java/org/apache/nifi/controller/cassandra/CassandraDistributedMapCache.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/java/org/apache/nifi/controller/cassandra/CassandraDistributedMapCache.java
index a921ec1ad9..64bc38b079 100644
--- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/java/org/apache/nifi/controller/cassandra/CassandraDistributedMapCache.java
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/java/org/apache/nifi/controller/cassandra/CassandraDistributedMapCache.java
@@ -33,6 +33,7 @@ import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.distributed.cache.client.Deserializer;
 import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
 import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.processor.util.StandardValidators;
 
 import java.io.ByteArrayOutputStream;
@@ -66,6 +67,7 @@ public class CassandraDistributedMapCache extends AbstractControllerService impl
             .description("The name of the table where the cache will be stored.")
             .required(true)
             .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
     public static final PropertyDescriptor KEY_FIELD_NAME = new PropertyDescriptor.Builder()
@@ -74,6 +76,7 @@ public class CassandraDistributedMapCache extends AbstractControllerService impl
             .description("The name of the field that acts as the unique key. (The CQL type should be \"blob\")")
             .required(true)
             .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
     public static final PropertyDescriptor VALUE_FIELD_NAME = new PropertyDescriptor.Builder()
@@ -82,6 +85,7 @@ public class CassandraDistributedMapCache extends AbstractControllerService impl
             .description("The name of the field that will store the value. (The CQL type should be \"blob\")")
             .required(true)
             .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
     public static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
@@ -90,6 +94,7 @@ public class CassandraDistributedMapCache extends AbstractControllerService impl
             .description("If configured, this will set a TTL (Time to Live) for each row inserted into the table so that " +
                     "old cache items expire after a certain period of time.")
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .required(false)
             .build();
 
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/test/groovy/org/apache/nifi/CassandraDistributedMapCacheIT.groovy b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/test/groovy/org/apache/nifi/CassandraDistributedMapCacheIT.groovy
index 459106dd17..2fdc3e6657 100644
--- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/test/groovy/org/apache/nifi/CassandraDistributedMapCacheIT.groovy
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/test/groovy/org/apache/nifi/CassandraDistributedMapCacheIT.groovy
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi
 
+import com.datastax.driver.core.Cluster
 import com.datastax.driver.core.Session
 import org.apache.nifi.controller.cassandra.CassandraDistributedMapCache
 import org.apache.nifi.distributed.cache.client.Deserializer
@@ -30,6 +31,10 @@ import org.apache.nifi.util.TestRunners
 import org.junit.jupiter.api.AfterAll
 import org.junit.jupiter.api.BeforeAll
 import org.junit.jupiter.api.Test
+import org.testcontainers.containers.CassandraContainer
+import org.testcontainers.junit.jupiter.Container
+import org.testcontainers.junit.jupiter.Testcontainers
+import org.testcontainers.utility.DockerImageName
 
 /**
  * Setup instructions:
@@ -42,11 +47,16 @@ import org.junit.jupiter.api.Test
  *
  * Table SQL: create table dmc (id blob, value blob, primary key(id));
  */
+@Testcontainers
 class CassandraDistributedMapCacheIT {
+    @Container
+    static final CassandraContainer CASSANDRA_CONTAINER = new CassandraContainer(DockerImageName.parse("cassandra:4.1"))
     static TestRunner runner
     static CassandraDistributedMapCache distributedMapCache
     static Session session
 
+    static final String KEYSPACE = "sample_keyspace"
+
     @BeforeAll
     static void setup() {
         runner = TestRunners.newTestRunner(new AbstractProcessor() {
@@ -57,10 +67,20 @@ class CassandraDistributedMapCacheIT {
         })
         distributedMapCache = new CassandraDistributedMapCache()
 
+        InetSocketAddress contactPoint = CASSANDRA_CONTAINER.getContactPoint()
+        String connectionString = String.format("%s:%d", contactPoint.getHostName(), contactPoint.getPort())
+
+        Cluster cluster = Cluster.builder().addContactPoint(contactPoint.getHostName())
+                .withPort(contactPoint.getPort()).build();
+        session = cluster.connect();
+
+        session.execute("create keyspace nifi_test with replication = { 'replication_factor': 1, 'class': 'SimpleStrategy' }");
+        session.execute("create table nifi_test.dmc (id blob, value blob, primary key(id))");
+
         def cassandraService = new CassandraSessionProvider()
         runner.addControllerService("provider", cassandraService)
         runner.addControllerService("dmc", distributedMapCache)
-        runner.setProperty(cassandraService, CassandraSessionProvider.CONTACT_POINTS, "localhost:9042")
+        runner.setProperty(cassandraService, CassandraSessionProvider.CONTACT_POINTS, connectionString)
         runner.setProperty(cassandraService, CassandraSessionProvider.KEYSPACE, "nifi_test")
         runner.setProperty(distributedMapCache, CassandraDistributedMapCache.SESSION_PROVIDER, "provider")
         runner.setProperty(distributedMapCache, CassandraDistributedMapCache.TABLE_NAME, "dmc")
@@ -85,7 +105,7 @@ class CassandraDistributedMapCacheIT {
 
     @AfterAll
     static void cleanup() {
-        session.execute("TRUNCATE dmc")
+        session.execute("TRUNCATE nifi_test.dmc")
     }
 
     Serializer<String> serializer = { str, os ->
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml
index f7f9ad7873..84fee14a78 100644
--- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml
@@ -93,5 +93,18 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-text</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>cassandra</artifactId>
+            <version>${testcontainers.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>junit-jupiter</artifactId>
+            <version>${testcontainers.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordIT.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordIT.java
index cee7fd1656..f0943c2afb 100644
--- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordIT.java
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordIT.java
@@ -30,13 +30,21 @@ import org.apache.nifi.util.TestRunners;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.CassandraContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
 
+import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
+@Testcontainers
 public class PutCassandraRecordIT {
+    @Container
+    private static final CassandraContainer CASSANDRA_CONTAINER = new CassandraContainer(DockerImageName.parse("cassandra:4.1"));
 
     private static TestRunner testRunner;
     private static MockRecordParser recordReader;
@@ -46,16 +54,15 @@ public class PutCassandraRecordIT {
 
     private static final String KEYSPACE = "sample_keyspace";
     private static final String TABLE = "sample_table";
-    private static final String HOST = "localhost";
-    private static final int PORT = 9042;
 
     @BeforeAll
     public static void setup() throws InitializationException {
         recordReader = new MockRecordParser();
         testRunner = TestRunners.newTestRunner(PutCassandraRecord.class);
 
+        InetSocketAddress contactPoint = CASSANDRA_CONTAINER.getContactPoint();
         testRunner.setProperty(PutCassandraRecord.RECORD_READER_FACTORY, "reader");
-        testRunner.setProperty(PutCassandraRecord.CONTACT_POINTS, HOST + ":" + PORT);
+        testRunner.setProperty(PutCassandraRecord.CONTACT_POINTS, contactPoint.getHostString() + ":" + contactPoint.getPort());
         testRunner.setProperty(PutCassandraRecord.KEYSPACE, KEYSPACE);
         testRunner.setProperty(PutCassandraRecord.TABLE, TABLE);
         testRunner.setProperty(PutCassandraRecord.CONSISTENCY_LEVEL, "SERIAL");
@@ -63,7 +70,8 @@ public class PutCassandraRecordIT {
         testRunner.addControllerService("reader", recordReader);
         testRunner.enableControllerService(recordReader);
 
-        cluster = Cluster.builder().addContactPoint(HOST).withPort(PORT).build();
+        cluster = Cluster.builder().addContactPoint(contactPoint.getHostName())
+                .withPort(contactPoint.getPort()).build();
         session = cluster.connect();
 
         String createKeyspace = "CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " WITH replication = {'class':'SimpleStrategy','replication_factor':1};";