You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2020/11/30 00:02:13 UTC

[nifi] branch main updated: NIFI-7821 Added Cassandra-based DMC.

This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 3625a73  NIFI-7821 Added Cassandra-based DMC.
3625a73 is described below

commit 3625a73d91f9504e7117043402613ddbe3a0cbe9
Author: Mike Thomsen <mt...@apache.org>
AuthorDate: Thu Oct 29 15:02:14 2020 -0400

    NIFI-7821 Added Cassandra-based DMC.
    
    NIFI-7821 Updated configuration documentation.
    
    NIFI-7821 Fixed getAndPutIfAbsent and added int test.
    
    Signed-off-by: Matthew Burgess <ma...@apache.org>
    
    This closes #4635
---
 .../pom.xml                                        | 111 ++++++++++
 .../cassandra/CassandraDistributedMapCache.java    | 241 +++++++++++++++++++++
 .../nifi/controller/cassandra/QueryUtils.java      |  44 ++++
 .../org.apache.nifi.controller.ControllerService   |  15 ++
 .../nifi/CassandraDistributedMapCacheIT.groovy     | 133 ++++++++++++
 .../src/test/java/.gitignore                       |   0
 .../nifi-cassandra-services-nar/pom.xml            |   5 +
 nifi-nar-bundles/nifi-cassandra-bundle/pom.xml     |   1 +
 8 files changed, 550 insertions(+)

diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/pom.xml b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/pom.xml
new file mode 100644
index 0000000..75939d5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/pom.xml
@@ -0,0 +1,111 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-cassandra-bundle</artifactId>
+        <version>1.13.0-SNAPSHOT</version>
+    </parent>
+
+    <groupId>org.apache.nifi</groupId>
+    <artifactId>nifi-cassandra-distributedmapcache-service</artifactId>
+    <version>1.13.0-SNAPSHOT</version>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <version>1.13.0-SNAPSHOT</version>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <version>1.13.0-SNAPSHOT</version>
+            <artifactId>nifi-record</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-avro-record-utils</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>2.10.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.groovy</groupId>
+            <artifactId>groovy-all</artifactId>
+            <type>pom</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-distributed-cache-client-service-api</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-cassandra-services-api</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-cassandra-services</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-ssl-context-service-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/java/org/apache/nifi/controller/cassandra/CassandraDistributedMapCache.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/java/org/apache/nifi/controller/cassandra/CassandraDistributedMapCache.java
new file mode 100644
index 0000000..a921ec1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/java/org/apache/nifi/controller/cassandra/CassandraDistributedMapCache.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.cassandra;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.cassandra.CassandraSessionProviderService;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.nifi.controller.cassandra.QueryUtils.createDeleteStatement;
+import static org.apache.nifi.controller.cassandra.QueryUtils.createExistsQuery;
+import static org.apache.nifi.controller.cassandra.QueryUtils.createFetchQuery;
+import static org.apache.nifi.controller.cassandra.QueryUtils.createInsertStatement;
+
+@Tags({"map", "cache", "distributed", "cassandra"})
+@CapabilityDescription("Provides a DistributedMapCache client that is based on Apache Cassandra.")
+public class CassandraDistributedMapCache extends AbstractControllerService implements DistributedMapCacheClient {
+    public static final PropertyDescriptor SESSION_PROVIDER = new PropertyDescriptor.Builder()
+            .name("cassandra-dmc-session-provider")
+            .displayName("Session Provider")
+            .description("The client service that will configure the cassandra client connection.")
+            .required(true)
+            .identifiesControllerService(CassandraSessionProviderService.class)
+            .build();
+
+    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name("cassandra-dmc-table-name")
+            .displayName("Table Name")
+            .description("The name of the table where the cache will be stored.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor KEY_FIELD_NAME = new PropertyDescriptor.Builder()
+            .name("cassandra-dmc-key-field-name")
+            .displayName("Key Field Name")
+            .description("The name of the field that acts as the unique key. (The CQL type should be \"blob\")")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor VALUE_FIELD_NAME = new PropertyDescriptor.Builder()
+            .name("cassandra-dmc-value-field-name")
+            .displayName("Value Field Name")
+            .description("The name of the field that will store the value. (The CQL type should be \"blob\")")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
+            .name("cassandra-dmc-ttl")
+            .displayName("TTL")
+            .description("If configured, this will set a TTL (Time to Live) for each row inserted into the table so that " +
+                    "old cache items expire after a certain period of time.")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .required(false)
+            .build();
+
+    public static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
+        SESSION_PROVIDER, TABLE_NAME, KEY_FIELD_NAME, VALUE_FIELD_NAME, TTL
+    ));
+
+    private CassandraSessionProviderService sessionProviderService;
+    private String tableName;
+    private String keyField;
+    private String valueField;
+    private Long ttl;
+
+    private Session session;
+    private PreparedStatement deleteStatement;
+    private PreparedStatement existsStatement;
+    private PreparedStatement fetchStatement;
+    private PreparedStatement insertStatement;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @OnEnabled
+    public void onEnabled(ConfigurationContext context) {
+        sessionProviderService = context.getProperty(SESSION_PROVIDER).asControllerService(CassandraSessionProviderService.class);
+        tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
+        keyField = context.getProperty(KEY_FIELD_NAME).evaluateAttributeExpressions().getValue();
+        valueField = context.getProperty(VALUE_FIELD_NAME).evaluateAttributeExpressions().getValue();
+        if (context.getProperty(TTL).isSet()) {
+            ttl = context.getProperty(TTL).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS);
+        }
+
+        session = sessionProviderService.getCassandraSession();
+
+        deleteStatement = session.prepare(createDeleteStatement(keyField, tableName));
+        existsStatement = session.prepare(createExistsQuery(keyField, tableName));
+        fetchStatement = session.prepare(createFetchQuery(keyField, valueField, tableName));
+        insertStatement = session.prepare(createInsertStatement(keyField, valueField, tableName, ttl));
+    }
+
+    @OnDisabled
+    public void onDisabled() {
+        session = null;
+        deleteStatement = null;
+        existsStatement = null;
+        fetchStatement = null;
+        insertStatement = null;
+    }
+
+    @Override
+    public <K, V> boolean putIfAbsent(K k, V v, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+        if (containsKey(k, keySerializer)) {
+            return false;
+        } else {
+            put(k, v, keySerializer, valueSerializer);
+            return true;
+        }
+    }
+
+    @Override
+    public <K, V> V getAndPutIfAbsent(K k, V v, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> deserializer) throws IOException {
+        V got = get(k, keySerializer, deserializer);
+        boolean wasAbsent = putIfAbsent(k, v, keySerializer, valueSerializer);
+
+        return !wasAbsent ? got : null;
+    }
+
+    @Override
+    public <K> boolean containsKey(K k, Serializer<K> serializer) throws IOException {
+        byte[] key = serializeKey(k, serializer);
+
+        BoundStatement statement = existsStatement.bind();
+        ByteBuffer buffer = ByteBuffer.wrap(key);
+        statement.setBytes(0, buffer);
+        ResultSet rs =session.execute(statement);
+        Iterator<Row> iterator = rs.iterator();
+
+        if (iterator.hasNext()) {
+            Row row = iterator.next();
+            long value = row.getLong("exist_count");
+            return value > 0;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public <K, V> void put(K k, V v, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+        BoundStatement statement = insertStatement.bind();
+        statement.setBytes(0, ByteBuffer.wrap(serializeKey(k, keySerializer)));
+        statement.setBytes(1, ByteBuffer.wrap(serializeValue(v, valueSerializer)));
+        session.execute(statement);
+    }
+
+    @Override
+    public <K, V> V get(K k, Serializer<K> serializer, Deserializer<V> deserializer) throws IOException {
+        BoundStatement boundStatement = fetchStatement.bind();
+        boundStatement.setBytes(0, ByteBuffer.wrap(serializeKey(k, serializer)));
+        ResultSet rs = session.execute(boundStatement);
+        Iterator<Row> iterator = rs.iterator();
+        if (!iterator.hasNext()) {
+            return null;
+        }
+
+        Row fetched = iterator.next();
+        ByteBuffer buffer = fetched.getBytes(valueField);
+
+        byte[] content = buffer.array();
+
+        return deserializer.deserialize(content);
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public <K> boolean remove(K k, Serializer<K> serializer) throws IOException {
+        BoundStatement delete = deleteStatement.bind();
+        delete.setBytes(0, ByteBuffer.wrap(serializeKey(k, serializer)));
+        session.execute(delete);
+
+        return true;
+    }
+
+    @Override
+    public long removeByPattern(String s) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    private <K> byte[] serializeKey(K k, Serializer<K> keySerializer) throws IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        keySerializer.serialize(k, out);
+        out.close();
+
+        return out.toByteArray();
+    }
+
+    private <V> byte[] serializeValue(V v, Serializer<V> valueSerializer) throws IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        valueSerializer.serialize(v, out);
+        out.close();
+
+        return out.toByteArray();
+    }
+}
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/java/org/apache/nifi/controller/cassandra/QueryUtils.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/java/org/apache/nifi/controller/cassandra/QueryUtils.java
new file mode 100644
index 0000000..cc42dd7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/java/org/apache/nifi/controller/cassandra/QueryUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.cassandra;
+
+public class QueryUtils {
+    private QueryUtils() {}
+
+    public static String createDeleteStatement(String keyField, String table) {
+        return String.format("DELETE FROM %s WHERE %s = ?", table, keyField);
+    }
+
+    public static String createExistsQuery(String keyField, String table) {
+        return String.format("SELECT COUNT(*) as exist_count FROM %s WHERE %s = ?", table, keyField);
+    }
+
+    public static String createFetchQuery(String keyField, String valueField, String table) {
+        return String.format("SELECT %s FROM %s WHERE %s = ?", valueField, table, keyField);
+    }
+
+    public static String createInsertStatement(String keyField, String valueField, String table, Long ttl) {
+        String retVal = String.format("INSERT INTO %s (%s, %s) VALUES(?, ?)", table, keyField, valueField);
+
+        if (ttl != null) {
+            retVal += String.format(" using ttl %d", ttl);
+        }
+
+        return retVal;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000..c3d32c8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.controller.cassandra.CassandraDistributedMapCache
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/test/groovy/org/apache/nifi/CassandraDistributedMapCacheIT.groovy b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/test/groovy/org/apache/nifi/CassandraDistributedMapCacheIT.groovy
new file mode 100644
index 0000000..4db419d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/test/groovy/org/apache/nifi/CassandraDistributedMapCacheIT.groovy
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi
+
+import com.datastax.driver.core.Session
+import org.apache.nifi.controller.cassandra.CassandraDistributedMapCache
+import org.apache.nifi.distributed.cache.client.Deserializer
+import org.apache.nifi.distributed.cache.client.Serializer
+import org.apache.nifi.processor.AbstractProcessor
+import org.apache.nifi.processor.ProcessContext
+import org.apache.nifi.processor.ProcessSession
+import org.apache.nifi.processor.exception.ProcessException
+import org.apache.nifi.service.CassandraSessionProvider
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.junit.AfterClass
+import org.junit.BeforeClass
+import org.junit.Test
+/**
+ * Setup instructions:
+ *
+ * docker run -p 7000:7000 -p 9042:9042 --name cassandra --restart always -d cassandra:3
+ *
+ * docker exec -it cassandra cqlsh
+ *
+ * Keyspace CQL: create keyspace nifi_test with replication = { 'replication_factor': 1, 'class': 'SimpleStrategy' } ;
+ *
+ * Table SQL: create table dmc (id blob, value blob, primary key(id));
+ */
+class CassandraDistributedMapCacheIT {
+    static TestRunner runner
+    static CassandraDistributedMapCache distributedMapCache
+    static Session session
+
+    @BeforeClass
+    static void setup() {
+        runner = TestRunners.newTestRunner(new AbstractProcessor() {
+            @Override
+            void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
+
+            }
+        })
+        distributedMapCache = new CassandraDistributedMapCache()
+
+        def cassandraService = new CassandraSessionProvider()
+        runner.addControllerService("provider", cassandraService)
+        runner.addControllerService("dmc", distributedMapCache)
+        runner.setProperty(cassandraService, CassandraSessionProvider.CONTACT_POINTS, "localhost:9042")
+        runner.setProperty(cassandraService, CassandraSessionProvider.KEYSPACE, "nifi_test")
+        runner.setProperty(distributedMapCache, CassandraDistributedMapCache.SESSION_PROVIDER, "provider")
+        runner.setProperty(distributedMapCache, CassandraDistributedMapCache.TABLE_NAME, "dmc")
+        runner.setProperty(distributedMapCache, CassandraDistributedMapCache.KEY_FIELD_NAME, "id")
+        runner.setProperty(distributedMapCache, CassandraDistributedMapCache.VALUE_FIELD_NAME, "value")
+        runner.setProperty(distributedMapCache, CassandraDistributedMapCache.TTL, "5 sec")
+        runner.enableControllerService(cassandraService)
+        runner.enableControllerService(distributedMapCache)
+        runner.assertValid()
+
+        session = cassandraService.getCassandraSession();
+        session.execute("""
+            INSERT INTO dmc (id, value) VALUES(textAsBlob('contains-key'), textAsBlob('testvalue'))
+        """)
+        session.execute("""
+            INSERT INTO dmc (id, value) VALUES(textAsBlob('delete-key'), textAsBlob('testvalue'))
+        """)
+        session.execute("""
+            INSERT INTO dmc (id, value) VALUES(textAsBlob('get-and-put-key'), textAsBlob('testvalue'))
+        """)
+    }
+
+    @AfterClass
+    static void cleanup() {
+        session.execute("TRUNCATE dmc")
+    }
+
+    Serializer<String> serializer = { str, os ->
+        os.write(str.bytes)
+    } as Serializer
+
+    Deserializer<String> deserializer = { input ->
+        new String(input)
+    } as Deserializer
+
+    @Test
+    void testContainsKey() {
+        def contains = distributedMapCache.containsKey("contains-key", serializer)
+        assert contains
+    }
+
+    @Test
+    void testGetAndPutIfAbsent() {
+        def result = distributedMapCache.getAndPutIfAbsent('get-and-put-key', 'testing', serializer, serializer, deserializer)
+        assert result == 'testvalue'
+    }
+
+    @Test
+    void testRemove() {
+        distributedMapCache.remove("delete-key", serializer)
+    }
+
+    @Test
+    void testGet() {
+        def result = distributedMapCache.get("contains-key", serializer, deserializer)
+        assert result == "testvalue"
+    }
+
+    @Test
+    void testPut() {
+        distributedMapCache.put("put-key", "sometestdata", serializer, serializer)
+        Thread.sleep(1000)
+        assert distributedMapCache.containsKey("put-key", serializer)
+    }
+
+    @Test
+    void testPutIfAbsent() {
+        assert distributedMapCache.putIfAbsent("put-if-absent-key", "testingthis", serializer, serializer)
+        assert !distributedMapCache.putIfAbsent("put-if-absent-key", "testingthis", serializer, serializer)
+    }
+}
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/test/java/.gitignore b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/test/java/.gitignore
new file mode 100644
index 0000000..e69de29
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services-nar/pom.xml b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services-nar/pom.xml
index abd07ee..a8c2672 100644
--- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services-nar/pom.xml
@@ -36,6 +36,11 @@
             <artifactId>nifi-cassandra-services</artifactId>
             <version>1.13.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-cassandra-distributedmapcache-service</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/pom.xml b/nifi-nar-bundles/nifi-cassandra-bundle/pom.xml
index be2e23a..bb538c0 100644
--- a/nifi-nar-bundles/nifi-cassandra-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/pom.xml
@@ -32,6 +32,7 @@
     <modules>
         <module>nifi-cassandra-processors</module>
         <module>nifi-cassandra-nar</module>
+        <module>nifi-cassandra-distributedmapcache-service</module>
         <module>nifi-cassandra-services-api</module>
         <module>nifi-cassandra-services-api-nar</module>
         <module>nifi-cassandra-services</module>