You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2020/11/30 00:02:13 UTC
[nifi] branch main updated: NIFI-7821 Added Cassandra-based DMC.
This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 3625a73 NIFI-7821 Added Cassandra-based DMC.
3625a73 is described below
commit 3625a73d91f9504e7117043402613ddbe3a0cbe9
Author: Mike Thomsen <mt...@apache.org>
AuthorDate: Thu Oct 29 15:02:14 2020 -0400
NIFI-7821 Added Cassandra-based DMC.
NIFI-7821 Updated configuration documentation.
NIFI-7821 Fixed getAndPutIfAbsent and added int test.
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #4635
---
.../pom.xml | 111 ++++++++++
.../cassandra/CassandraDistributedMapCache.java | 241 +++++++++++++++++++++
.../nifi/controller/cassandra/QueryUtils.java | 44 ++++
.../org.apache.nifi.controller.ControllerService | 15 ++
.../nifi/CassandraDistributedMapCacheIT.groovy | 133 ++++++++++++
.../src/test/java/.gitignore | 0
.../nifi-cassandra-services-nar/pom.xml | 5 +
nifi-nar-bundles/nifi-cassandra-bundle/pom.xml | 1 +
8 files changed, 550 insertions(+)
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/pom.xml b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/pom.xml
new file mode 100644
index 0000000..75939d5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/pom.xml
@@ -0,0 +1,111 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-cassandra-bundle</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-cassandra-distributedmapcache-service</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-service-api</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <version>1.13.0-SNAPSHOT</version>
+ <artifactId>nifi-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <version>1.13.0-SNAPSHOT</version>
+ <artifactId>nifi-record</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-avro-record-utils</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.10.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.groovy</groupId>
+ <artifactId>groovy-all</artifactId>
+ <type>pom</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock-record-utils</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-distributed-cache-client-service-api</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-cassandra-services-api</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-cassandra-services</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-ssl-context-service-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/java/org/apache/nifi/controller/cassandra/CassandraDistributedMapCache.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/java/org/apache/nifi/controller/cassandra/CassandraDistributedMapCache.java
new file mode 100644
index 0000000..a921ec1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/java/org/apache/nifi/controller/cassandra/CassandraDistributedMapCache.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.cassandra;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.cassandra.CassandraSessionProviderService;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.nifi.controller.cassandra.QueryUtils.createDeleteStatement;
+import static org.apache.nifi.controller.cassandra.QueryUtils.createExistsQuery;
+import static org.apache.nifi.controller.cassandra.QueryUtils.createFetchQuery;
+import static org.apache.nifi.controller.cassandra.QueryUtils.createInsertStatement;
+
+@Tags({"map", "cache", "distributed", "cassandra"})
+@CapabilityDescription("Provides a DistributedMapCache client that is based on Apache Cassandra.")
+public class CassandraDistributedMapCache extends AbstractControllerService implements DistributedMapCacheClient {
+ public static final PropertyDescriptor SESSION_PROVIDER = new PropertyDescriptor.Builder()
+ .name("cassandra-dmc-session-provider")
+ .displayName("Session Provider")
+ .description("The client service that will configure the cassandra client connection.")
+ .required(true)
+ .identifiesControllerService(CassandraSessionProviderService.class)
+ .build();
+
+ public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+ .name("cassandra-dmc-table-name")
+ .displayName("Table Name")
+ .description("The name of the table where the cache will be stored.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor KEY_FIELD_NAME = new PropertyDescriptor.Builder()
+ .name("cassandra-dmc-key-field-name")
+ .displayName("Key Field Name")
+ .description("The name of the field that acts as the unique key. (The CQL type should be \"blob\")")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor VALUE_FIELD_NAME = new PropertyDescriptor.Builder()
+ .name("cassandra-dmc-value-field-name")
+ .displayName("Value Field Name")
+ .description("The name of the field that will store the value. (The CQL type should be \"blob\")")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
+ .name("cassandra-dmc-ttl")
+ .displayName("TTL")
+ .description("If configured, this will set a TTL (Time to Live) for each row inserted into the table so that " +
+ "old cache items expire after a certain period of time.")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .required(false)
+ .build();
+
+ public static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
+ SESSION_PROVIDER, TABLE_NAME, KEY_FIELD_NAME, VALUE_FIELD_NAME, TTL
+ ));
+
+ private CassandraSessionProviderService sessionProviderService;
+ private String tableName;
+ private String keyField;
+ private String valueField;
+ private Long ttl;
+
+ private Session session;
+ private PreparedStatement deleteStatement;
+ private PreparedStatement existsStatement;
+ private PreparedStatement fetchStatement;
+ private PreparedStatement insertStatement;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return DESCRIPTORS;
+ }
+
+ @OnEnabled
+ public void onEnabled(ConfigurationContext context) {
+ sessionProviderService = context.getProperty(SESSION_PROVIDER).asControllerService(CassandraSessionProviderService.class);
+ tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
+ keyField = context.getProperty(KEY_FIELD_NAME).evaluateAttributeExpressions().getValue();
+ valueField = context.getProperty(VALUE_FIELD_NAME).evaluateAttributeExpressions().getValue();
+ if (context.getProperty(TTL).isSet()) {
+ ttl = context.getProperty(TTL).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS);
+ }
+
+ session = sessionProviderService.getCassandraSession();
+
+ deleteStatement = session.prepare(createDeleteStatement(keyField, tableName));
+ existsStatement = session.prepare(createExistsQuery(keyField, tableName));
+ fetchStatement = session.prepare(createFetchQuery(keyField, valueField, tableName));
+ insertStatement = session.prepare(createInsertStatement(keyField, valueField, tableName, ttl));
+ }
+
+ @OnDisabled
+ public void onDisabled() {
+ session = null;
+ deleteStatement = null;
+ existsStatement = null;
+ fetchStatement = null;
+ insertStatement = null;
+ }
+
+ @Override
+ public <K, V> boolean putIfAbsent(K k, V v, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+ if (containsKey(k, keySerializer)) {
+ return false;
+ } else {
+ put(k, v, keySerializer, valueSerializer);
+ return true;
+ }
+ }
+
+ @Override
+ public <K, V> V getAndPutIfAbsent(K k, V v, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> deserializer) throws IOException {
+ V got = get(k, keySerializer, deserializer);
+ boolean wasAbsent = putIfAbsent(k, v, keySerializer, valueSerializer);
+
+ return !wasAbsent ? got : null;
+ }
+
+ @Override
+ public <K> boolean containsKey(K k, Serializer<K> serializer) throws IOException {
+ byte[] key = serializeKey(k, serializer);
+
+ BoundStatement statement = existsStatement.bind();
+ ByteBuffer buffer = ByteBuffer.wrap(key);
+ statement.setBytes(0, buffer);
+ ResultSet rs =session.execute(statement);
+ Iterator<Row> iterator = rs.iterator();
+
+ if (iterator.hasNext()) {
+ Row row = iterator.next();
+ long value = row.getLong("exist_count");
+ return value > 0;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public <K, V> void put(K k, V v, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+ BoundStatement statement = insertStatement.bind();
+ statement.setBytes(0, ByteBuffer.wrap(serializeKey(k, keySerializer)));
+ statement.setBytes(1, ByteBuffer.wrap(serializeValue(v, valueSerializer)));
+ session.execute(statement);
+ }
+
+ @Override
+ public <K, V> V get(K k, Serializer<K> serializer, Deserializer<V> deserializer) throws IOException {
+ BoundStatement boundStatement = fetchStatement.bind();
+ boundStatement.setBytes(0, ByteBuffer.wrap(serializeKey(k, serializer)));
+ ResultSet rs = session.execute(boundStatement);
+ Iterator<Row> iterator = rs.iterator();
+ if (!iterator.hasNext()) {
+ return null;
+ }
+
+ Row fetched = iterator.next();
+ ByteBuffer buffer = fetched.getBytes(valueField);
+
+ byte[] content = buffer.array();
+
+ return deserializer.deserialize(content);
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public <K> boolean remove(K k, Serializer<K> serializer) throws IOException {
+ BoundStatement delete = deleteStatement.bind();
+ delete.setBytes(0, ByteBuffer.wrap(serializeKey(k, serializer)));
+ session.execute(delete);
+
+ return true;
+ }
+
+ @Override
+ public long removeByPattern(String s) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ private <K> byte[] serializeKey(K k, Serializer<K> keySerializer) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ keySerializer.serialize(k, out);
+ out.close();
+
+ return out.toByteArray();
+ }
+
+ private <V> byte[] serializeValue(V v, Serializer<V> valueSerializer) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ valueSerializer.serialize(v, out);
+ out.close();
+
+ return out.toByteArray();
+ }
+}
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/java/org/apache/nifi/controller/cassandra/QueryUtils.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/java/org/apache/nifi/controller/cassandra/QueryUtils.java
new file mode 100644
index 0000000..cc42dd7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/java/org/apache/nifi/controller/cassandra/QueryUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.cassandra;
+
+public class QueryUtils {
+ private QueryUtils() {}
+
+ public static String createDeleteStatement(String keyField, String table) {
+ return String.format("DELETE FROM %s WHERE %s = ?", table, keyField);
+ }
+
+ public static String createExistsQuery(String keyField, String table) {
+ return String.format("SELECT COUNT(*) as exist_count FROM %s WHERE %s = ?", table, keyField);
+ }
+
+ public static String createFetchQuery(String keyField, String valueField, String table) {
+ return String.format("SELECT %s FROM %s WHERE %s = ?", valueField, table, keyField);
+ }
+
+ public static String createInsertStatement(String keyField, String valueField, String table, Long ttl) {
+ String retVal = String.format("INSERT INTO %s (%s, %s) VALUES(?, ?)", table, keyField, valueField);
+
+ if (ttl != null) {
+ retVal += String.format(" using ttl %d", ttl);
+ }
+
+ return retVal;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000..c3d32c8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.controller.cassandra.CassandraDistributedMapCache
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/test/groovy/org/apache/nifi/CassandraDistributedMapCacheIT.groovy b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/test/groovy/org/apache/nifi/CassandraDistributedMapCacheIT.groovy
new file mode 100644
index 0000000..4db419d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/test/groovy/org/apache/nifi/CassandraDistributedMapCacheIT.groovy
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi
+
+import com.datastax.driver.core.Session
+import org.apache.nifi.controller.cassandra.CassandraDistributedMapCache
+import org.apache.nifi.distributed.cache.client.Deserializer
+import org.apache.nifi.distributed.cache.client.Serializer
+import org.apache.nifi.processor.AbstractProcessor
+import org.apache.nifi.processor.ProcessContext
+import org.apache.nifi.processor.ProcessSession
+import org.apache.nifi.processor.exception.ProcessException
+import org.apache.nifi.service.CassandraSessionProvider
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.junit.AfterClass
+import org.junit.BeforeClass
+import org.junit.Test
+/**
+ * Setup instructions:
+ *
+ * docker run -p 7000:7000 -p 9042:9042 --name cassandra --restart always -d cassandra:3
+ *
+ * docker exec -it cassandra cqlsh
+ *
+ * Keyspace CQL: create keyspace nifi_test with replication = { 'replication_factor': 1, 'class': 'SimpleStrategy' } ;
+ *
+ * Table SQL: create table dmc (id blob, value blob, primary key(id));
+ */
+class CassandraDistributedMapCacheIT {
+ static TestRunner runner
+ static CassandraDistributedMapCache distributedMapCache
+ static Session session
+
+ @BeforeClass
+ static void setup() {
+ runner = TestRunners.newTestRunner(new AbstractProcessor() {
+ @Override
+ void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
+
+ }
+ })
+ distributedMapCache = new CassandraDistributedMapCache()
+
+ def cassandraService = new CassandraSessionProvider()
+ runner.addControllerService("provider", cassandraService)
+ runner.addControllerService("dmc", distributedMapCache)
+ runner.setProperty(cassandraService, CassandraSessionProvider.CONTACT_POINTS, "localhost:9042")
+ runner.setProperty(cassandraService, CassandraSessionProvider.KEYSPACE, "nifi_test")
+ runner.setProperty(distributedMapCache, CassandraDistributedMapCache.SESSION_PROVIDER, "provider")
+ runner.setProperty(distributedMapCache, CassandraDistributedMapCache.TABLE_NAME, "dmc")
+ runner.setProperty(distributedMapCache, CassandraDistributedMapCache.KEY_FIELD_NAME, "id")
+ runner.setProperty(distributedMapCache, CassandraDistributedMapCache.VALUE_FIELD_NAME, "value")
+ runner.setProperty(distributedMapCache, CassandraDistributedMapCache.TTL, "5 sec")
+ runner.enableControllerService(cassandraService)
+ runner.enableControllerService(distributedMapCache)
+ runner.assertValid()
+
+ session = cassandraService.getCassandraSession();
+ session.execute("""
+ INSERT INTO dmc (id, value) VALUES(textAsBlob('contains-key'), textAsBlob('testvalue'))
+ """)
+ session.execute("""
+ INSERT INTO dmc (id, value) VALUES(textAsBlob('delete-key'), textAsBlob('testvalue'))
+ """)
+ session.execute("""
+ INSERT INTO dmc (id, value) VALUES(textAsBlob('get-and-put-key'), textAsBlob('testvalue'))
+ """)
+ }
+
+ @AfterClass
+ static void cleanup() {
+ session.execute("TRUNCATE dmc")
+ }
+
+ Serializer<String> serializer = { str, os ->
+ os.write(str.bytes)
+ } as Serializer
+
+ Deserializer<String> deserializer = { input ->
+ new String(input)
+ } as Deserializer
+
+ @Test
+ void testContainsKey() {
+ def contains = distributedMapCache.containsKey("contains-key", serializer)
+ assert contains
+ }
+
+ @Test
+ void testGetAndPutIfAbsent() {
+ def result = distributedMapCache.getAndPutIfAbsent('get-and-put-key', 'testing', serializer, serializer, deserializer)
+ assert result == 'testvalue'
+ }
+
+ @Test
+ void testRemove() {
+ distributedMapCache.remove("delete-key", serializer)
+ }
+
+ @Test
+ void testGet() {
+ def result = distributedMapCache.get("contains-key", serializer, deserializer)
+ assert result == "testvalue"
+ }
+
+ @Test
+ void testPut() {
+ distributedMapCache.put("put-key", "sometestdata", serializer, serializer)
+ Thread.sleep(1000)
+ assert distributedMapCache.containsKey("put-key", serializer)
+ }
+
+ @Test
+ void testPutIfAbsent() {
+ assert distributedMapCache.putIfAbsent("put-if-absent-key", "testingthis", serializer, serializer)
+ assert !distributedMapCache.putIfAbsent("put-if-absent-key", "testingthis", serializer, serializer)
+ }
+}
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/test/java/.gitignore b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/test/java/.gitignore
new file mode 100644
index 0000000..e69de29
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services-nar/pom.xml b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services-nar/pom.xml
index abd07ee..a8c2672 100644
--- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services-nar/pom.xml
@@ -36,6 +36,11 @@
<artifactId>nifi-cassandra-services</artifactId>
<version>1.13.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-cassandra-distributedmapcache-service</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/pom.xml b/nifi-nar-bundles/nifi-cassandra-bundle/pom.xml
index be2e23a..bb538c0 100644
--- a/nifi-nar-bundles/nifi-cassandra-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/pom.xml
@@ -32,6 +32,7 @@
<modules>
<module>nifi-cassandra-processors</module>
<module>nifi-cassandra-nar</module>
+ <module>nifi-cassandra-distributedmapcache-service</module>
<module>nifi-cassandra-services-api</module>
<module>nifi-cassandra-services-api-nar</module>
<module>nifi-cassandra-services</module>