You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2022/01/25 16:03:54 UTC

[nifi] branch main updated: NIFI-9625 This closes #5711. Refactored Distributed Cache Server and Client Tests - Replaced TestServerAndClient with separate classes for Set Server and Map Server - Implemented before and after annotations for starting and stopping server instances NIFI-9625 Added check for cache directory existence before clean NIFI-9625 Updated Map and Set Cache Server Tests to use random port

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new b40b5a7  NIFI-9625 This closes #5711. Refactored Distributed Cache Server and Client Tests - Replaced TestServerAndClient with separate classes for Set Server and Map Server - Implemented before and after annotations for starting and stopping server instances NIFI-9625 Added check for cache directory existence before clean NIFI-9625 Updated Map and Set Cache Server Tests to use random port
b40b5a7 is described below

commit b40b5a7c96d973f08a04a8d382e8c9fbce9f758a
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Mon Jan 24 16:13:16 2022 -0600

    NIFI-9625 This closes #5711. Refactored Distributed Cache Server and Client Tests
    - Replaced TestServerAndClient with separate classes for Set Server and Map Server
    - Implemented before and after annotations for starting and stopping server instances
    NIFI-9625 Added check for cache directory existence before clean
    NIFI-9625 Updated Map and Set Cache Server Tests to use random port
    
    Signed-off-by: Joe Witt <jo...@apache.org>
---
 .../cache/server/TestServerAndClient.java          | 810 ---------------------
 .../cache/server/map/DistributedMapCacheTest.java  |   3 +-
 .../server/map/DistributedMapCacheTlsTest.java     |   3 +-
 .../map/TestDistributedMapServerAndClient.java     | 328 +++++++++
 .../cache/server/set/DistributedSetCacheTest.java  |   3 +-
 .../set/TestDistributedSetServerAndClient.java     | 269 +++++++
 6 files changed, 603 insertions(+), 813 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
deleted file mode 100644
index 916908b..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
+++ /dev/null
@@ -1,810 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.ConnectException;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.lang3.SerializationException;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
-import org.apache.nifi.distributed.cache.client.Deserializer;
-import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
-import org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService;
-import org.apache.nifi.distributed.cache.client.Serializer;
-import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
-import org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer;
-import org.apache.nifi.distributed.cache.server.map.MapCacheServer;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.processor.Processor;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.remote.StandardVersionNegotiator;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.util.MockConfigurationContext;
-import org.apache.nifi.util.MockControllerServiceInitializationContext;
-import org.apache.nifi.util.MockPropertyValue;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.net.ssl.SSLContext;
-
-public class TestServerAndClient {
-
-    private static final Logger LOGGER;
-
-    static {
-        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
-        System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
-        System.setProperty("org.slf4j.simpleLogger.log.nifi.distributed.cache.server.AbstractCacheServer", "debug");
-        System.setProperty("org.slf4j.simpleLogger.log.nifi.distributed.cache.client.DistributedMapCacheClientService", "debug");
-        System.setProperty("org.slf4j.simpleLogger.log.nifi.distributed.cache.server.TestServerAndClient", "debug");
-        System.setProperty("org.slf4j.simpleLogger.log.nifi.remote.io.socket.ssl.SSLSocketChannel", "trace");
-        LOGGER = LoggerFactory.getLogger(TestServerAndClient.class);
-    }
-
-    @Test
-    public void testNonPersistentSetServerAndClient() throws InitializationException, IOException {
-        LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
-        // Create server
-        final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
-        final DistributedSetCacheServer server = new SetServer();
-        runner.addControllerService("server", server);
-        runner.enableControllerService(server);
-
-        final DistributedSetCacheClientService client = createClient(server.getPort());
-        final Serializer<String> serializer = new StringSerializer();
-        final boolean added = client.addIfAbsent("test", serializer);
-        assertTrue(added);
-
-        final boolean contains = client.contains("test", serializer);
-        assertTrue(contains);
-
-        final boolean addedAgain = client.addIfAbsent("test", serializer);
-        assertFalse(addedAgain);
-
-        final boolean removed = client.remove("test", serializer);
-        assertTrue(removed);
-
-        final boolean containedAfterRemove = client.contains("test", serializer);
-        assertFalse(containedAfterRemove);
-
-        server.shutdownServer();
-    }
-
-    @Test
-    public void testPersistentSetServerAndClient() throws InitializationException, IOException {
-        LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
-
-        final File dataFile = new File("target/cache-data");
-        deleteRecursively(dataFile);
-
-        // Create server
-        final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
-        final DistributedSetCacheServer server = new SetServer();
-        runner.addControllerService("server", server);
-        runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
-        runner.enableControllerService(server);
-
-        DistributedSetCacheClientService client = createClient(server.getPort());
-        final Serializer<String> serializer = new StringSerializer();
-        final boolean added = client.addIfAbsent("test", serializer);
-        final boolean added2 = client.addIfAbsent("test2", serializer);
-        assertTrue(added);
-        assertTrue(added2);
-
-        final boolean contains = client.contains("test", serializer);
-        final boolean contains2 = client.contains("test2", serializer);
-        assertTrue(contains);
-        assertTrue(contains2);
-
-        final boolean addedAgain = client.addIfAbsent("test", serializer);
-        assertFalse(addedAgain);
-
-        final boolean removed = client.remove("test", serializer);
-        assertTrue(removed);
-
-        final boolean containedAfterRemove = client.contains("test", serializer);
-        assertFalse(containedAfterRemove);
-
-        client.close();
-        server.shutdownServer();
-
-        final DistributedSetCacheServer newServer = new SetServer();
-        runner.addControllerService("server2", newServer);
-        runner.setProperty(newServer, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
-        runner.enableControllerService(newServer);
-        client = createClient(newServer.getPort());
-
-        assertFalse(client.contains("test", serializer));
-        assertTrue(client.contains("test2", serializer));
-
-        client.close();
-        newServer.shutdownServer();
-    }
-
-    @Test
-    public void testPersistentSetServerAndClientWithLFUEvictions() throws InitializationException, IOException {
-        LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
-        // Create server
-        final File dataFile = new File("target/cache-data");
-        deleteRecursively(dataFile);
-
-        // Create server
-        final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
-        final DistributedSetCacheServer server = new SetServer();
-        runner.addControllerService("server", server);
-        runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
-        runner.setProperty(server, DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3");
-        runner.setProperty(server, DistributedSetCacheServer.EVICTION_POLICY, DistributedSetCacheServer.EVICTION_STRATEGY_LFU);
-        runner.enableControllerService(server);
-
-        DistributedSetCacheClientService client = createClient(server.getPort());
-        final Serializer<String> serializer = new StringSerializer();
-        final boolean added = client.addIfAbsent("test", serializer);
-        waitABit();
-        final boolean added2 = client.addIfAbsent("test2", serializer);
-        waitABit();
-        final boolean added3 = client.addIfAbsent("test3", serializer);
-        waitABit();
-        assertTrue(added);
-        assertTrue(added2);
-        assertTrue(added3);
-
-        final boolean contains = client.contains("test", serializer);
-        final boolean contains2 = client.contains("test2", serializer);
-        assertTrue(contains);
-        assertTrue(contains2);
-
-        final boolean addedAgain = client.addIfAbsent("test", serializer);
-        assertFalse(addedAgain);
-
-        final boolean added4 = client.addIfAbsent("test4", serializer);
-        assertTrue(added4);
-
-        // ensure that added3 was evicted because it was used least frequently
-        assertFalse(client.contains("test3", serializer));
-
-        client.close();
-        server.shutdownServer();
-
-
-        final DistributedSetCacheServer newServer = new SetServer();
-        runner.addControllerService("server2", newServer);
-        runner.setProperty(newServer, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
-        runner.enableControllerService(newServer);
-        client = createClient(newServer.getPort());
-
-        assertTrue(client.contains("test", serializer));
-        assertTrue(client.contains("test2", serializer));
-        assertFalse(client.contains("test3", serializer));
-        assertTrue(client.contains("test4", serializer));
-
-        client.close();
-        newServer.shutdownServer();
-    }
-
-    @Test
-    public void testPersistentMapServerAndClientWithLFUEvictions() throws InitializationException, IOException {
-        LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
-        // Create server
-        final File dataFile = new File("target/cache-data");
-        deleteRecursively(dataFile);
-
-        // Create server
-        final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
-        final DistributedMapCacheServer server = new MapServer();
-        runner.addControllerService("server", server);
-        runner.setProperty(server, DistributedMapCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
-        runner.setProperty(server, DistributedMapCacheServer.MAX_CACHE_ENTRIES, "3");
-        runner.setProperty(server, DistributedMapCacheServer.EVICTION_POLICY, DistributedMapCacheServer.EVICTION_STRATEGY_LFU);
-        runner.enableControllerService(server);
-
-        DistributedMapCacheClientService client = createMapClient(server.getPort());
-        final Serializer<String> serializer = new StringSerializer();
-        final boolean added = client.putIfAbsent("test", "1", serializer, serializer);
-        waitABit();
-        final boolean added2 = client.putIfAbsent("test2", "2", serializer, serializer);
-        waitABit();
-        final boolean added3 = client.putIfAbsent("test3", "3", serializer, serializer);
-        waitABit();
-        assertTrue(added);
-        assertTrue(added2);
-        assertTrue(added3);
-
-        final boolean contains = client.containsKey("test", serializer);
-        final boolean contains2 = client.containsKey("test2", serializer);
-        assertTrue(contains);
-        assertTrue(contains2);
-
-        final Deserializer<String> deserializer = new StringDeserializer();
-        final Set<String> keys = client.keySet(deserializer);
-        assertEquals(3, keys.size());
-        assertTrue(keys.contains("test"));
-        assertTrue(keys.contains("test2"));
-        assertTrue(keys.contains("test3"));
-
-        final boolean addedAgain = client.putIfAbsent("test", "1", serializer, serializer);
-        assertFalse(addedAgain);
-
-        final boolean added4 = client.putIfAbsent("test4", "4", serializer, serializer);
-        assertTrue(added4);
-
-        // ensure that added3 was evicted because it was used least frequently
-        assertFalse(client.containsKey("test3", serializer));
-
-        client.close();
-        server.shutdownServer();
-
-        final DistributedMapCacheServer newServer = new MapServer();
-        runner.addControllerService("server2", newServer);
-        runner.setProperty(newServer, DistributedMapCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
-        runner.enableControllerService(newServer);
-        client = createMapClient(newServer.getPort());
-
-        assertTrue(client.containsKey("test", serializer));
-        assertTrue(client.containsKey("test2", serializer));
-        assertFalse(client.containsKey("test3", serializer));
-        assertTrue(client.containsKey("test4", serializer));
-
-        // Test removeByPattern, the first two should be removed and the last should remain
-        client.put("test.1", "1", serializer, serializer);
-        client.put("test.2", "2", serializer, serializer);
-        client.put("test3", "2", serializer, serializer);
-        final long removedTwo = client.removeByPattern("test\\..*");
-        assertEquals(2L, removedTwo);
-        assertFalse(client.containsKey("test.1", serializer));
-        assertFalse(client.containsKey("test.2", serializer));
-        assertTrue(client.containsKey("test3", serializer));
-
-        // test removeByPatternAndGet
-        client.put("test.1", "1", serializer, serializer);
-        client.put("test.2", "2", serializer, serializer);
-        Map<String,String> removed = client.removeByPatternAndGet("test\\..*", deserializer, deserializer);
-        assertEquals(2, removed.size());
-        assertTrue(removed.containsKey("test.1"));
-        assertTrue(removed.containsKey("test.2"));
-        assertFalse(client.containsKey("test.1", serializer));
-        assertFalse(client.containsKey("test.2", serializer));
-        assertTrue(client.containsKey("test3", serializer));
-        removed = client.removeByPatternAndGet("test\\..*", deserializer, deserializer);
-        assertEquals(0, removed.size());
-
-        client.close();
-        newServer.shutdownServer();
-    }
-
-    @Test
-    public void testPersistentSetServerAndClientWithFIFOEvictions() throws InitializationException, IOException {
-        LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
-
-        final File dataFile = new File("target/cache-data");
-        deleteRecursively(dataFile);
-
-        // Create server
-        final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
-        final DistributedSetCacheServer server = new SetServer();
-        runner.addControllerService("server", server);
-        runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
-        runner.setProperty(server, DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3");
-        runner.setProperty(server, DistributedSetCacheServer.EVICTION_POLICY, DistributedSetCacheServer.EVICTION_STRATEGY_FIFO);
-        runner.enableControllerService(server);
-
-        DistributedSetCacheClientService client = createClient(server.getPort());
-        final Serializer<String> serializer = new StringSerializer();
-
-        // add 3 entries to the cache. But, if we add too fast, we'll have the same millisecond
-        // for the entry time so we don't know which entry will be evicted. So we wait a few millis in between
-        final boolean added = client.addIfAbsent("test", serializer);
-        waitABit();
-        final boolean added2 = client.addIfAbsent("test2", serializer);
-        waitABit();
-        final boolean added3 = client.addIfAbsent("test3", serializer);
-        waitABit();
-
-        assertTrue(added);
-        assertTrue(added2);
-        assertTrue(added3);
-
-        final boolean contains = client.contains("test", serializer);
-        final boolean contains2 = client.contains("test2", serializer);
-        assertTrue(contains);
-        assertTrue(contains2);
-
-        final boolean addedAgain = client.addIfAbsent("test", serializer);
-        assertFalse(addedAgain);
-
-        final boolean added4 = client.addIfAbsent("test4", serializer);
-        assertTrue(added4);
-
-        // ensure that added3 was evicted because it was used least frequently
-        assertFalse(client.contains("test", serializer));
-        assertTrue(client.contains("test3", serializer));
-
-        client.close();
-        server.shutdownServer();
-
-        final DistributedSetCacheServer newServer = new SetServer();
-        runner.addControllerService("server2", newServer);
-        runner.setProperty(newServer, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
-        runner.setProperty(newServer, DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3");
-        runner.setProperty(newServer, DistributedSetCacheServer.EVICTION_POLICY, DistributedSetCacheServer.EVICTION_STRATEGY_FIFO);
-        runner.enableControllerService(newServer);
-
-        client = createClient(newServer.getPort());
-        assertFalse(client.contains("test", serializer));
-        assertTrue(client.contains("test2", serializer));
-        assertTrue(client.contains("test3", serializer));
-        assertTrue(client.contains("test4", serializer));
-
-        client.close();
-        newServer.shutdownServer();
-    }
-
-    @Test
-    public void testNonPersistentMapServerAndClient() throws InitializationException, IOException, InterruptedException {
-        LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
-
-        // Create server
-        final DistributedMapCacheServer server = new MapServer();
-        final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
-        runner.addControllerService("server", server);
-        runner.enableControllerService(server);
-
-        DistributedMapCacheClientService client = new DistributedMapCacheClientService();
-        runner.addControllerService("client", client);
-        runner.setProperty(client, DistributedMapCacheClientService.HOSTNAME, "localhost");
-        runner.setProperty(client, DistributedMapCacheClientService.PORT, String.valueOf(server.getPort()));
-        runner.enableControllerService(client);
-
-        final Serializer<String> valueSerializer = new StringSerializer();
-        final Serializer<String> keySerializer = new StringSerializer();
-        final Deserializer<String> deserializer = new StringDeserializer();
-
-        final String original = client.getAndPutIfAbsent("testKey", "test", keySerializer, valueSerializer, deserializer);
-        assertEquals(null, original);
-        LOGGER.debug("end getAndPutIfAbsent");
-
-        final boolean contains = client.containsKey("testKey", keySerializer);
-        assertTrue(contains);
-        LOGGER.debug("end containsKey");
-
-        final boolean added = client.putIfAbsent("testKey", "test", keySerializer, valueSerializer);
-        assertFalse(added);
-        LOGGER.debug("end putIfAbsent");
-
-        final String originalAfterPut = client.getAndPutIfAbsent("testKey", "test", keySerializer, valueSerializer, deserializer);
-        assertEquals("test", originalAfterPut);
-        LOGGER.debug("end getAndPutIfAbsent");
-
-        final boolean removed = client.remove("testKey", keySerializer);
-        assertTrue(removed);
-        LOGGER.debug("end remove");
-
-        client.put("testKey", "testValue", keySerializer, valueSerializer);
-        assertTrue(client.containsKey("testKey", keySerializer));
-        String removedValue = client.removeAndGet("testKey", keySerializer, deserializer);
-        assertEquals("testValue", removedValue);
-        removedValue = client.removeAndGet("testKey", keySerializer, deserializer);
-        assertNull(removedValue);
-
-        final Set<String> keys = client.keySet(deserializer);
-        assertEquals(0, keys.size());
-
-        // Test removeByPattern, the first two should be removed and the last should remain
-        client.put("test.1", "1", keySerializer, keySerializer);
-        client.put("test.2", "2", keySerializer, keySerializer);
-        client.put("test3", "2", keySerializer, keySerializer);
-        final long removedTwo = client.removeByPattern("test\\..*");
-        assertEquals(2L, removedTwo);
-        assertFalse(client.containsKey("test.1", keySerializer));
-        assertFalse(client.containsKey("test.2", keySerializer));
-        assertTrue(client.containsKey("test3", keySerializer));
-
-        final boolean containedAfterRemove = client.containsKey("testKey", keySerializer);
-        assertFalse(containedAfterRemove);
-
-        client.putIfAbsent("testKey", "test", keySerializer, valueSerializer);
-        runner.disableControllerService(client);
-        try {
-            client.containsKey("testKey", keySerializer);
-            fail("Should be closed and not accessible");
-        } catch (final Exception e) {
-
-        }
-
-        DistributedMapCacheClientService client2 = new DistributedMapCacheClientService();
-        runner.addControllerService("client2", client2);
-        runner.setProperty(client2, DistributedMapCacheClientService.HOSTNAME, "localhost");
-        runner.setProperty(client2, DistributedMapCacheClientService.PORT, String.valueOf(server.getPort()));
-        runner.enableControllerService(client2);
-
-        assertFalse(client2.putIfAbsent("testKey", "test", keySerializer, valueSerializer));
-        assertTrue(client2.containsKey("testKey", keySerializer));
-        server.shutdownServer();
-        Thread.sleep(1000);
-        try {
-            client2.containsKey("testKey", keySerializer);
-            fail("Should have blown exception!");
-        } catch (final ConnectException e) {
-            client2 = null;
-        }
-        LOGGER.debug("end testNonPersistentMapServerAndClient");
-    }
-
-    @Test
-    public void testClientTermination() throws InitializationException, IOException, InterruptedException {
-        LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
-        // Create server
-        final DistributedMapCacheServer server = new MapServer();
-        final MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server, "server");
-        server.initialize(serverInitContext);
-
-        final Map<PropertyDescriptor, String> serverProperties = new HashMap<>();
-        final MockConfigurationContext serverContext = new MockConfigurationContext(serverProperties, serverInitContext.getControllerServiceLookup());
-        server.startServer(serverContext);
-
-        DistributedMapCacheClientService client = new DistributedMapCacheClientService();
-        MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client");
-        client.initialize(clientInitContext);
-
-        final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
-        clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost");
-        clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs");
-        MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup());
-        client.onEnabled(clientContext);
-        final Serializer<String> valueSerializer = new StringSerializer();
-        final Serializer<String> keySerializer = new StringSerializer();
-        final Deserializer<String> deserializer = new StringDeserializer();
-
-        final String original = client.getAndPutIfAbsent("testKey", "test", keySerializer, valueSerializer, deserializer);
-        assertEquals(null, original);
-
-        final boolean contains = client.containsKey("testKey", keySerializer);
-        assertTrue(contains);
-
-        final boolean added = client.putIfAbsent("testKey", "test", keySerializer, valueSerializer);
-        assertFalse(added);
-
-        final String originalAfterPut = client.getAndPutIfAbsent("testKey", "test", keySerializer, valueSerializer, deserializer);
-        assertEquals("test", originalAfterPut);
-
-        final boolean removed = client.remove("testKey", keySerializer);
-        assertTrue(removed);
-
-        final boolean containedAfterRemove = client.containsKey("testKey", keySerializer);
-        assertFalse(containedAfterRemove);
-
-        client = null;
-        clientInitContext = null;
-        clientContext = null;
-        Thread.sleep(2000);
-        System.gc();
-        server.shutdownServer();
-    }
-
-    @Test
-    public void testOptimisticLock() throws Exception {
-        LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
-
-        // Create server
-        final DistributedMapCacheServer server = new MapServer();
-        final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
-        runner.addControllerService("server", server);
-        runner.enableControllerService(server);
-
-        DistributedMapCacheClientService client1 = new DistributedMapCacheClientService();
-        MockControllerServiceInitializationContext clientInitContext1 = new MockControllerServiceInitializationContext(client1, "client1");
-        client1.initialize(clientInitContext1);
-
-        DistributedMapCacheClientService client2 = new DistributedMapCacheClientService();
-        MockControllerServiceInitializationContext clientInitContext2 = new MockControllerServiceInitializationContext(client2, "client2");
-        client2.initialize(clientInitContext2);
-
-        final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
-        clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost");
-        clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(server.getPort()));
-        clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs");
-
-        MockConfigurationContext clientContext1 = new MockConfigurationContext(clientProperties, clientInitContext1.getControllerServiceLookup());
-        client1.onEnabled(clientContext1);
-        MockConfigurationContext clientContext2 = new MockConfigurationContext(clientProperties, clientInitContext2.getControllerServiceLookup());
-        client2.onEnabled(clientContext2);
-
-        final Serializer<String> stringSerializer = new StringSerializer();
-        final Deserializer<String> stringDeserializer = new StringDeserializer();
-
-        final String key = "test-optimistic-lock";
-
-        // Ensure there's no existing key
-        assertFalse(client1.containsKey(key, stringSerializer));
-        assertNull(client1.fetch(key, stringSerializer, stringDeserializer));
-
-        // Client 1 inserts the key.
-        client1.put(key, "valueC1-0", stringSerializer, stringSerializer);
-
-        // Client 1 and 2 fetch the key
-        AtomicCacheEntry<String, String, Long> c1 = client1.fetch(key, stringSerializer, stringDeserializer);
-        AtomicCacheEntry<String, String, Long> c2 = client2.fetch(key, stringSerializer, stringDeserializer);
-        assertEquals(new Long(0), c1.getRevision().orElse(0L));
-        assertEquals("valueC1-0", c1.getValue());
-        assertEquals(new Long(0), c2.getRevision().orElse(0L));
-        assertEquals("valueC1-0", c2.getValue());
-
-        // Client 1 replace
-        c1.setValue("valueC1-1");
-        boolean c1Result = client1.replace(c1, stringSerializer, stringSerializer);
-        assertTrue("C1 should be able to replace the key", c1Result);
-        // Client 2 replace with the old revision
-        c2.setValue("valueC2-1");
-        boolean c2Result = client2.replace(c2, stringSerializer, stringSerializer);
-        assertFalse("C2 shouldn't be able to replace the key", c2Result);
-
-        // Client 2 fetch the key again
-        c2 = client2.fetch(key, stringSerializer, stringDeserializer);
-        assertEquals("valueC1-1", c2.getValue());
-        assertEquals(new Long(1), c2.getRevision().orElse(0L));
-
-        // Now, Client 2 knows the correct revision so it can replace the key
-        c2.setValue("valueC2-2");
-        c2Result = client2.replace(c2, stringSerializer, stringSerializer);
-        assertTrue("C2 should be able to replace the key", c2Result);
-
-        // Assert the cache
-        c2 = client2.fetch(key, stringSerializer, stringDeserializer);
-        assertEquals("valueC2-2", c2.getValue());
-        assertEquals(new Long(2), c2.getRevision().orElse(0L));
-
-        client1.close();
-        client2.close();
-        server.shutdownServer();
-    }
-
-    @Test
-    public void testBackwardCompatibility() throws Exception {
-        LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
-
-        final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
-
-        // Create a server that only supports protocol version 1.
-        final DistributedMapCacheServer server = new MapServer() {
-            @Override
-            protected MapCacheServer createMapCacheServer(int port, int maxSize, SSLContext sslContext, EvictionPolicy evictionPolicy, File persistenceDir, int maxReadSize) throws IOException {
-                return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir, maxReadSize) {
-                    @Override
-                    protected StandardVersionNegotiator getVersionNegotiator() {
-                        return new StandardVersionNegotiator(1);
-                    }
-                };
-            }
-        };
-        runner.addControllerService("server", server);
-        runner.enableControllerService(server);
-
-        DistributedMapCacheClientService client = new DistributedMapCacheClientService();
-        MockControllerServiceInitializationContext clientInitContext1 = new MockControllerServiceInitializationContext(client, "client");
-        client.initialize(clientInitContext1);
-
-        final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
-        clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost");
-        clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(server.getPort()));
-        clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs");
-
-        MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext1.getControllerServiceLookup());
-        client.onEnabled(clientContext);
-
-        final Serializer<String> stringSerializer = new StringSerializer();
-        final Deserializer<String> stringDeserializer = new StringDeserializer();
-
-        final String key = "test-backward-compatibility";
-
-        // Version 1 operations should work
-        client.put(key, "value1", stringSerializer, stringSerializer);
-        assertEquals("value1", client.get(key, stringSerializer, stringDeserializer));
-
-        assertTrue(client.containsKey(key, stringSerializer));
-
-        try {
-            client.fetch(key, stringSerializer, stringDeserializer);
-            fail("Version 2 operations should NOT work.");
-        } catch (UnsupportedOperationException e) {
-        }
-
-        try {
-            AtomicCacheEntry<String,String,Long> entry = new AtomicCacheEntry<>(key, "value2", 0L);
-            client.replace(entry, stringSerializer, stringSerializer);
-            fail("Version 2 operations should NOT work.");
-        } catch (UnsupportedOperationException e) {
-        }
-
-        try {
-            Set<String> keys = client.keySet(stringDeserializer);
-            fail("Version 3 operations should NOT work.");
-        } catch (UnsupportedOperationException e) {
-        }
-
-        try {
-            String removed = client.removeAndGet("v.*", stringSerializer, stringDeserializer);
-            fail("Version 3 operations should NOT work.");
-        } catch (UnsupportedOperationException e) {
-        }
-
-        try {
-            Map<String, String> removed = client.removeByPatternAndGet("v.*", stringDeserializer, stringDeserializer);
-            fail("Version 3 operations should NOT work.");
-        } catch (UnsupportedOperationException e) {
-        }
-        client.close();
-        server.shutdownServer();
-    }
-
-    @Test
-    public void testLimitServiceReadSizeMap() throws InitializationException, IOException {
-        final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
-        final DistributedMapCacheServer server = new MapServer();
-        runner.addControllerService("server", server);
-        runner.enableControllerService(server);
-
-        final DistributedMapCacheClientService client = createMapClient(server.getPort());
-        final Serializer<String> serializer = new StringSerializer();
-
-        final String key = "key";
-        final int maxReadSize = new MockPropertyValue(DistributedCacheServer.MAX_READ_SIZE.getDefaultValue()).asDataSize(DataUnit.B).intValue();
-        final int belowThreshold = maxReadSize / key.length();
-        final int aboveThreshold = belowThreshold + 1;
-        final String keyBelowThreshold = StringUtils.repeat(key, belowThreshold);
-        final String keyAboveThreshold = StringUtils.repeat(key, aboveThreshold);
-        assertFalse(client.containsKey(keyBelowThreshold, serializer));
-        assertThrows(IOException.class, () -> client.containsKey(keyAboveThreshold, serializer));
-    }
-
-    @Test
-    public void testLimitServiceReadSizeSet() throws InitializationException, IOException {
-        final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
-        final DistributedSetCacheServer server = new SetServer();
-        runner.addControllerService("server", server);
-        runner.enableControllerService(server);
-
-        final DistributedSetCacheClientService client = createClient(server.getPort());
-        final Serializer<String> serializer = new StringSerializer();
-
-        final String value = "value";
-        final int maxReadSize = new MockPropertyValue(DistributedCacheServer.MAX_READ_SIZE.getDefaultValue()).asDataSize(DataUnit.B).intValue();
-        final int belowThreshold = maxReadSize / value.length();
-        final int aboveThreshold = belowThreshold + 1;
-        final String valueBelowThreshold = StringUtils.repeat(value, belowThreshold);
-        final String valueAboveThreshold = StringUtils.repeat(value, aboveThreshold);
-        assertFalse(client.contains(valueBelowThreshold, serializer));
-        assertThrows(IOException.class, () -> client.contains(valueAboveThreshold, serializer));
-    }
-
-    private void waitABit() {
-        try {
-            Thread.sleep(10L);
-        } catch (final InterruptedException e) {
-        }
-    }
-
-    private DistributedSetCacheClientService createClient(final int port) throws InitializationException {
-        final DistributedSetCacheClientService client = new DistributedSetCacheClientService();
-        final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client");
-        client.initialize(clientInitContext);
-
-        final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
-        clientProperties.put(DistributedSetCacheClientService.HOSTNAME, "localhost");
-        clientProperties.put(DistributedSetCacheClientService.PORT, String.valueOf(port));
-        final MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup());
-        client.onEnabled(clientContext);
-
-        return client;
-    }
-
-    private DistributedMapCacheClientService createMapClient(final int port) throws InitializationException {
-        final DistributedMapCacheClientService client = new DistributedMapCacheClientService();
-        final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client");
-        client.initialize(clientInitContext);
-
-        final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
-        clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost");
-        clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(port));
-        final MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup());
-        client.onEnabled(clientContext);
-
-        return client;
-    }
-
-    private static class StringSerializer implements Serializer<String> {
-
-        @Override
-        public void serialize(final String value, final OutputStream output) throws SerializationException, IOException {
-            output.write(value.getBytes(StandardCharsets.UTF_8));
-        }
-    }
-
-    private static class StringDeserializer implements Deserializer<String> {
-
-        @Override
-        public String deserialize(final byte[] input) throws DeserializationException, IOException {
-            return input.length == 0 ? null : new String(input, StandardCharsets.UTF_8);
-        }
-    }
-
-    private static void deleteRecursively(final File dataFile) throws IOException {
-        if (dataFile == null || !dataFile.exists()) {
-            return;
-        }
-
-        final File[] children = dataFile.listFiles();
-        for (final File child : children) {
-            if (child.isDirectory()) {
-                deleteRecursively(child);
-            } else {
-                for (int i = 0; i < 100 && child.exists(); i++) {
-                    child.delete();
-                }
-
-                if (child.exists()) {
-                    throw new IOException("Could not delete " + dataFile.getAbsolutePath());
-                }
-            }
-        }
-    }
-
-    private static List<PropertyDescriptor> replacePortDescriptor(final List<PropertyDescriptor> descriptors) {
-        descriptors.remove(DistributedCacheServer.PORT);
-        descriptors.add(new PropertyDescriptor.Builder()
-            .name("Port")
-            .description("The port to listen on for incoming connections")
-            .required(true)
-            .addValidator(StandardValidators.createLongValidator(0L, 65535L, true))
-            .defaultValue("0")
-            .build());
-        return descriptors;
-    }
-
-    private static class SetServer extends DistributedSetCacheServer {
-        @Override
-        protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-            return replacePortDescriptor(super.getSupportedPropertyDescriptors());
-        }
-    }
-
-    private static class MapServer extends DistributedMapCacheServer {
-        @Override
-        protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-            return replacePortDescriptor(super.getSupportedPropertyDescriptors());
-        }
-    }
-}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTest.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTest.java
index 254f925..42e9f56 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTest.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTest.java
@@ -23,6 +23,7 @@ import org.apache.nifi.distributed.cache.client.Serializer;
 import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
 import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
 import org.apache.nifi.processor.Processor;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.AfterClass;
@@ -60,7 +61,7 @@ public class DistributedMapCacheTest {
 
     @BeforeClass
     public static void beforeClass() throws Exception {
-        final String port = DistributedMapCacheServer.PORT.getDefaultValue();
+        final String port = Integer.toString(NetworkUtils.getAvailableTcpPort());
         runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
 
         server = new DistributedMapCacheServer();
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTlsTest.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTlsTest.java
index 2608424..9fddfaf 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTlsTest.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTlsTest.java
@@ -22,6 +22,7 @@ import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService
 import org.apache.nifi.distributed.cache.client.Serializer;
 import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
 import org.apache.nifi.processor.Processor;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
 import org.apache.nifi.security.util.SslContextFactory;
 import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
 import org.apache.nifi.security.util.TlsConfiguration;
@@ -62,7 +63,7 @@ public class DistributedMapCacheTlsTest {
 
     @BeforeClass
     public static void beforeClass() throws Exception {
-        final String port = DistributedMapCacheServer.PORT.getDefaultValue();
+        final String port = Integer.toString(NetworkUtils.getAvailableTcpPort());
         runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
         sslContextService = createSslContextService();
         runner.addControllerService(sslContextService.getIdentifier(), sslContextService);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java
new file mode 100644
index 0000000..dc2d0ab
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.SerializationException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
+import org.apache.nifi.distributed.cache.server.DistributedCacheServer;
+import org.apache.nifi.distributed.cache.server.EvictionPolicy;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockConfigurationContext;
+import org.apache.nifi.util.MockControllerServiceInitializationContext;
+import org.apache.nifi.util.MockPropertyValue;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import javax.net.ssl.SSLContext;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestDistributedMapServerAndClient {
+
+    private final File dataFile = new File("target/cache-data");
+
+    private TestRunner runner;
+
+    private DistributedMapCacheServer server;
+
+    @BeforeEach
+    public void setRunner() throws InitializationException, IOException {
+        if (dataFile.exists()) {
+            FileUtils.cleanDirectory(dataFile);
+        }
+
+        runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
+
+        server = new DistributedMapCacheServer();
+        runner.addControllerService("server", server);
+
+        final int port = NetworkUtils.getAvailableTcpPort();
+        runner.setProperty(server, DistributedMapCacheServer.PORT, Integer.toString(port));
+    }
+
+    @AfterEach
+    public void shutdownServer() throws IOException {
+        server.shutdownServer();
+    }
+
+    @Test
+    public void testNonPersistentMapServerAndClient() throws InitializationException, IOException {
+        runner.enableControllerService(server);
+
+        DistributedMapCacheClientService client = new DistributedMapCacheClientService();
+        try {
+            runner.addControllerService("client", client);
+            runner.setProperty(client, DistributedMapCacheClientService.HOSTNAME, "localhost");
+            runner.setProperty(client, DistributedMapCacheClientService.PORT, String.valueOf(server.getPort()));
+            runner.enableControllerService(client);
+
+            final Serializer<String> valueSerializer = new StringSerializer();
+            final Serializer<String> keySerializer = new StringSerializer();
+            final Deserializer<String> deserializer = new StringDeserializer();
+
+            final String original = client.getAndPutIfAbsent("testKey", "test", keySerializer, valueSerializer, deserializer);
+            assertNull(original);
+
+            final boolean contains = client.containsKey("testKey", keySerializer);
+            assertTrue(contains);
+
+            final boolean added = client.putIfAbsent("testKey", "test", keySerializer, valueSerializer);
+            assertFalse(added);
+
+            final String originalAfterPut = client.getAndPutIfAbsent("testKey", "test", keySerializer, valueSerializer, deserializer);
+            assertEquals("test", originalAfterPut);
+
+            final boolean removed = client.remove("testKey", keySerializer);
+            assertTrue(removed);
+
+            client.put("testKey", "testValue", keySerializer, valueSerializer);
+            assertTrue(client.containsKey("testKey", keySerializer));
+            String removedValue = client.removeAndGet("testKey", keySerializer, deserializer);
+            assertEquals("testValue", removedValue);
+            removedValue = client.removeAndGet("testKey", keySerializer, deserializer);
+            assertNull(removedValue);
+
+            final Set<String> keys = client.keySet(deserializer);
+            assertEquals(0, keys.size());
+
+            // Test removeByPattern, the first two should be removed and the last should remain
+            client.put("test.1", "1", keySerializer, keySerializer);
+            client.put("test.2", "2", keySerializer, keySerializer);
+            client.put("test3", "2", keySerializer, keySerializer);
+            final long removedTwo = client.removeByPattern("test\\..*");
+            assertEquals(2L, removedTwo);
+            assertFalse(client.containsKey("test.1", keySerializer));
+            assertFalse(client.containsKey("test.2", keySerializer));
+            assertTrue(client.containsKey("test3", keySerializer));
+
+            final boolean containedAfterRemove = client.containsKey("testKey", keySerializer);
+            assertFalse(containedAfterRemove);
+
+            client.putIfAbsent("testKey", "test", keySerializer, valueSerializer);
+            runner.disableControllerService(client);
+
+            assertThrows(Exception.class, () -> client.containsKey("testKey", keySerializer));
+        } finally {
+            client.close();
+        }
+    }
+
+    @Test
+    public void testOptimisticLock() throws Exception {
+        runner.enableControllerService(server);
+
+        DistributedMapCacheClientService client1 = new DistributedMapCacheClientService();
+        MockControllerServiceInitializationContext clientInitContext1 = new MockControllerServiceInitializationContext(client1, "client1");
+        client1.initialize(clientInitContext1);
+
+        DistributedMapCacheClientService client2 = new DistributedMapCacheClientService();
+        MockControllerServiceInitializationContext clientInitContext2 = new MockControllerServiceInitializationContext(client2, "client2");
+        client2.initialize(clientInitContext2);
+
+        final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
+        clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost");
+        clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(server.getPort()));
+        clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs");
+
+        MockConfigurationContext clientContext1 = new MockConfigurationContext(clientProperties, clientInitContext1.getControllerServiceLookup());
+        client1.onEnabled(clientContext1);
+        MockConfigurationContext clientContext2 = new MockConfigurationContext(clientProperties, clientInitContext2.getControllerServiceLookup());
+        client2.onEnabled(clientContext2);
+
+        try {
+            final Serializer<String> stringSerializer = new StringSerializer();
+            final Deserializer<String> stringDeserializer = new StringDeserializer();
+
+            final String key = "test-optimistic-lock";
+
+            // Ensure there's no existing key
+            assertFalse(client1.containsKey(key, stringSerializer));
+            assertNull(client1.fetch(key, stringSerializer, stringDeserializer));
+
+            // Client 1 inserts the key.
+            client1.put(key, "valueC1-0", stringSerializer, stringSerializer);
+
+            // Client 1 and 2 fetch the key
+            AtomicCacheEntry<String, String, Long> c1 = client1.fetch(key, stringSerializer, stringDeserializer);
+            AtomicCacheEntry<String, String, Long> c2 = client2.fetch(key, stringSerializer, stringDeserializer);
+            assertEquals(new Long(0), c1.getRevision().orElse(0L));
+            assertEquals("valueC1-0", c1.getValue());
+            assertEquals(new Long(0), c2.getRevision().orElse(0L));
+            assertEquals("valueC1-0", c2.getValue());
+
+            // Client 1 replace
+            c1.setValue("valueC1-1");
+            boolean c1Result = client1.replace(c1, stringSerializer, stringSerializer);
+            assertTrue(c1Result, "C1 should be able to replace the key");
+            // Client 2 replace with the old revision
+            c2.setValue("valueC2-1");
+            boolean c2Result = client2.replace(c2, stringSerializer, stringSerializer);
+            assertFalse(c2Result, "C2 shouldn't be able to replace the key");
+
+            // Client 2 fetch the key again
+            c2 = client2.fetch(key, stringSerializer, stringDeserializer);
+            assertEquals("valueC1-1", c2.getValue());
+            assertEquals(new Long(1), c2.getRevision().orElse(0L));
+
+            // Now, Client 2 knows the correct revision so it can replace the key
+            c2.setValue("valueC2-2");
+            c2Result = client2.replace(c2, stringSerializer, stringSerializer);
+            assertTrue(c2Result, "C2 should be able to replace the key");
+
+            // Assert the cache
+            c2 = client2.fetch(key, stringSerializer, stringDeserializer);
+            assertEquals("valueC2-2", c2.getValue());
+            assertEquals(new Long(2), c2.getRevision().orElse(0L));
+        } finally {
+            client1.close();
+            client2.close();
+        }
+    }
+
+    @Test
+    public void testBackwardCompatibility() throws Exception {
+        // Create a server that only supports protocol version 1.
+        server = new DistributedMapCacheServer() {
+            @Override
+            protected MapCacheServer createMapCacheServer(int port, int maxSize, SSLContext sslContext, EvictionPolicy evictionPolicy, File persistenceDir, int maxReadSize) throws IOException {
+                return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir, maxReadSize) {
+                    @Override
+                    protected StandardVersionNegotiator getVersionNegotiator() {
+                        return new StandardVersionNegotiator(1);
+                    }
+                };
+            }
+        };
+        runner.addControllerService("server", server);
+        final int port = NetworkUtils.getAvailableTcpPort();
+        runner.setProperty(server, DistributedMapCacheServer.PORT, Integer.toString(port));
+        runner.enableControllerService(server);
+
+        DistributedMapCacheClientService client = new DistributedMapCacheClientService();
+        MockControllerServiceInitializationContext clientInitContext1 = new MockControllerServiceInitializationContext(client, "client");
+        client.initialize(clientInitContext1);
+
+        final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
+        clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost");
+        clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(server.getPort()));
+        clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs");
+
+        MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext1.getControllerServiceLookup());
+        client.onEnabled(clientContext);
+
+        try {
+            final Serializer<String> stringSerializer = new StringSerializer();
+            final Deserializer<String> stringDeserializer = new StringDeserializer();
+
+            final String key = "test-backward-compatibility";
+
+            // Version 1 operations should work
+            client.put(key, "value1", stringSerializer, stringSerializer);
+            assertEquals("value1", client.get(key, stringSerializer, stringDeserializer));
+
+            assertTrue(client.containsKey(key, stringSerializer));
+
+            assertThrows(UnsupportedOperationException.class, () -> client.fetch(key, stringSerializer, stringDeserializer));
+
+            AtomicCacheEntry<String, String, Long> entry = new AtomicCacheEntry<>(key, "value2", 0L);
+            assertThrows(UnsupportedOperationException.class, () -> client.replace(entry, stringSerializer, stringSerializer));
+
+            assertThrows(UnsupportedOperationException.class, () -> client.keySet(stringDeserializer));
+            assertThrows(UnsupportedOperationException.class, () -> client.removeAndGet("v.*", stringSerializer, stringDeserializer));
+            assertThrows(UnsupportedOperationException.class, () ->client.removeByPatternAndGet("v.*", stringDeserializer, stringDeserializer));
+        } finally {
+            client.close();
+        }
+    }
+
+    @Test
+    public void testLimitServiceReadSize() throws InitializationException, IOException {
+        runner.enableControllerService(server);
+
+        final DistributedMapCacheClientService client = createClient(server.getPort());
+        try {
+            final Serializer<String> serializer = new StringSerializer();
+
+            final String value = "value";
+            final int maxReadSize = new MockPropertyValue(DistributedCacheServer.MAX_READ_SIZE.getDefaultValue()).asDataSize(DataUnit.B).intValue();
+            final int belowThreshold = maxReadSize / value.length();
+            final int aboveThreshold = belowThreshold + 1;
+            final String valueBelowThreshold = StringUtils.repeat(value, belowThreshold);
+            final String valueAboveThreshold = StringUtils.repeat(value, aboveThreshold);
+            assertFalse(client.containsKey(valueBelowThreshold, serializer));
+            assertThrows(IOException.class, () -> client.containsKey(valueAboveThreshold, serializer));
+        } finally {
+            client.close();
+        }
+    }
+
+    private DistributedMapCacheClientService createClient(final int port) throws InitializationException {
+        final DistributedMapCacheClientService client = new DistributedMapCacheClientService();
+        final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client");
+        client.initialize(clientInitContext);
+
+        final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
+        clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost");
+        clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(port));
+        final MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup());
+        client.onEnabled(clientContext);
+
+        return client;
+    }
+
+    private static class StringSerializer implements Serializer<String> {
+
+        @Override
+        public void serialize(final String value, final OutputStream output) throws SerializationException, IOException {
+            output.write(value.getBytes(StandardCharsets.UTF_8));
+        }
+    }
+
+    private static class StringDeserializer implements Deserializer<String> {
+
+        @Override
+        public String deserialize(final byte[] input) throws DeserializationException {
+            return input.length == 0 ? null : new String(input, StandardCharsets.UTF_8);
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/DistributedSetCacheTest.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/DistributedSetCacheTest.java
index 7580555..a5b6342 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/DistributedSetCacheTest.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/DistributedSetCacheTest.java
@@ -21,6 +21,7 @@ import org.apache.nifi.distributed.cache.client.Serializer;
 import org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService;
 import org.apache.nifi.distributed.cache.server.DistributedSetCacheServer;
 import org.apache.nifi.processor.Processor;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.AfterClass;
@@ -51,7 +52,7 @@ public class DistributedSetCacheTest {
 
     @BeforeClass
     public static void beforeClass() throws Exception {
-        final String port = DistributedSetCacheServer.PORT.getDefaultValue();
+        final String port = Integer.toString(NetworkUtils.getAvailableTcpPort());
         runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
 
         server = new DistributedSetCacheServer();
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/TestDistributedSetServerAndClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/TestDistributedSetServerAndClient.java
new file mode 100644
index 0000000..6484148
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/TestDistributedSetServerAndClient.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.set;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.SerializationException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.distributed.cache.server.DistributedCacheServer;
+import org.apache.nifi.distributed.cache.server.DistributedSetCacheServer;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockConfigurationContext;
+import org.apache.nifi.util.MockControllerServiceInitializationContext;
+import org.apache.nifi.util.MockPropertyValue;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.mockito.Mockito;
+
+public class TestDistributedSetServerAndClient {
+
+    private final File dataFile = new File("target/cache-data");
+
+    private TestRunner runner;
+
+    private DistributedSetCacheServer server;
+
+    @BeforeEach
+    public void setRunner() throws InitializationException, IOException {
+        if (dataFile.exists()) {
+            FileUtils.cleanDirectory(dataFile);
+        }
+
+        runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
+
+        server = new DistributedSetCacheServer();
+        runner.addControllerService("server", server);
+
+        final int port = NetworkUtils.getAvailableTcpPort();
+        runner.setProperty(server, DistributedSetCacheServer.PORT, Integer.toString(port));
+    }
+
+    @AfterEach
+    public void shutdownServer() throws IOException {
+        server.shutdownServer();
+    }
+
+    @Test
+    public void testNonPersistentSetServerAndClient() throws InitializationException, IOException {
+        runner.enableControllerService(server);
+
+        final DistributedSetCacheClientService client = createClient(server.getPort());
+        try {
+            final Serializer<String> serializer = new StringSerializer();
+            final boolean added = client.addIfAbsent("test", serializer);
+            assertTrue(added);
+
+            final boolean contains = client.contains("test", serializer);
+            assertTrue(contains);
+
+            final boolean addedAgain = client.addIfAbsent("test", serializer);
+            assertFalse(addedAgain);
+
+            final boolean removed = client.remove("test", serializer);
+            assertTrue(removed);
+
+            final boolean containedAfterRemove = client.contains("test", serializer);
+            assertFalse(containedAfterRemove);
+        } finally {
+            client.close();
+        }
+    }
+
+    @Test
+    public void testPersistentSetServerAndClient() throws InitializationException, IOException {
+        runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
+        runner.enableControllerService(server);
+
+        final DistributedSetCacheClientService client = createClient(server.getPort());
+        try {
+            final Serializer<String> serializer = new StringSerializer();
+            final boolean added = client.addIfAbsent("test", serializer);
+            final boolean added2 = client.addIfAbsent("test2", serializer);
+            assertTrue(added);
+            assertTrue(added2);
+
+            final boolean contains = client.contains("test", serializer);
+            final boolean contains2 = client.contains("test2", serializer);
+            assertTrue(contains);
+            assertTrue(contains2);
+
+            final boolean addedAgain = client.addIfAbsent("test", serializer);
+            assertFalse(addedAgain);
+
+            final boolean removed = client.remove("test", serializer);
+            assertTrue(removed);
+
+            final boolean containedAfterRemove = client.contains("test", serializer);
+            assertFalse(containedAfterRemove);
+        } finally {
+            client.close();
+        }
+    }
+
+    @Test
+    public void testPersistentSetServerAndClientWithLFUEvictions() throws InitializationException, IOException {
+        runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
+        runner.setProperty(server, DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3");
+        runner.setProperty(server, DistributedSetCacheServer.EVICTION_POLICY, DistributedSetCacheServer.EVICTION_STRATEGY_LFU);
+        runner.enableControllerService(server);
+
+        final DistributedSetCacheClientService client = createClient(server.getPort());
+        try {
+            final Serializer<String> serializer = new StringSerializer();
+            final boolean added = client.addIfAbsent("test", serializer);
+            waitABit();
+            final boolean added2 = client.addIfAbsent("test2", serializer);
+            waitABit();
+            final boolean added3 = client.addIfAbsent("test3", serializer);
+            waitABit();
+            assertTrue(added);
+            assertTrue(added2);
+            assertTrue(added3);
+
+            final boolean contains = client.contains("test", serializer);
+            final boolean contains2 = client.contains("test2", serializer);
+            assertTrue(contains);
+            assertTrue(contains2);
+
+            final boolean addedAgain = client.addIfAbsent("test", serializer);
+            assertFalse(addedAgain);
+
+            final boolean added4 = client.addIfAbsent("test4", serializer);
+            assertTrue(added4);
+
+            // ensure that added3 was evicted because it was used least frequently
+            assertFalse(client.contains("test3", serializer));
+        } finally {
+            client.close();
+        }
+    }
+
+    @Test
+    public void testPersistentSetServerAndClientWithFIFOEvictions() throws InitializationException, IOException {
+        runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
+        runner.setProperty(server, DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3");
+        runner.setProperty(server, DistributedSetCacheServer.EVICTION_POLICY, DistributedSetCacheServer.EVICTION_STRATEGY_FIFO);
+        runner.enableControllerService(server);
+
+        final DistributedSetCacheClientService client = createClient(server.getPort());
+        try {
+            final Serializer<String> serializer = new StringSerializer();
+
+            // add 3 entries to the cache. But, if we add too fast, we'll have the same millisecond
+            // for the entry time so we don't know which entry will be evicted. So we wait a few millis in between
+            final boolean added = client.addIfAbsent("test", serializer);
+            waitABit();
+            final boolean added2 = client.addIfAbsent("test2", serializer);
+            waitABit();
+            final boolean added3 = client.addIfAbsent("test3", serializer);
+            waitABit();
+
+            assertTrue(added);
+            assertTrue(added2);
+            assertTrue(added3);
+
+            final boolean contains = client.contains("test", serializer);
+            final boolean contains2 = client.contains("test2", serializer);
+            assertTrue(contains);
+            assertTrue(contains2);
+
+            final boolean addedAgain = client.addIfAbsent("test", serializer);
+            assertFalse(addedAgain);
+
+            final boolean added4 = client.addIfAbsent("test4", serializer);
+            assertTrue(added4);
+
+            // ensure that added3 was evicted because it was used least frequently
+            assertFalse(client.contains("test", serializer));
+            assertTrue(client.contains("test3", serializer));
+        } finally {
+            client.close();
+        }
+    }
+
+    @Test
+    public void testLimitServiceReadSize() throws InitializationException, IOException {
+        runner.enableControllerService(server);
+
+        final DistributedSetCacheClientService client = createClient(server.getPort());
+        try {
+            final Serializer<String> serializer = new StringSerializer();
+
+            final String value = "value";
+            final int maxReadSize = new MockPropertyValue(DistributedCacheServer.MAX_READ_SIZE.getDefaultValue()).asDataSize(DataUnit.B).intValue();
+            final int belowThreshold = maxReadSize / value.length();
+            final int aboveThreshold = belowThreshold + 1;
+            final String valueBelowThreshold = StringUtils.repeat(value, belowThreshold);
+            final String valueAboveThreshold = StringUtils.repeat(value, aboveThreshold);
+            assertFalse(client.contains(valueBelowThreshold, serializer));
+            assertThrows(IOException.class, () -> client.contains(valueAboveThreshold, serializer));
+        } finally {
+            client.close();
+        }
+    }
+
+    private void waitABit() {
+        try {
+            TimeUnit.MILLISECONDS.sleep(10);
+        } catch (final InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private DistributedSetCacheClientService createClient(final int port) throws InitializationException {
+        final DistributedSetCacheClientService client = new DistributedSetCacheClientService();
+        final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client");
+        client.initialize(clientInitContext);
+
+        final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
+        clientProperties.put(DistributedSetCacheClientService.HOSTNAME, "localhost");
+        clientProperties.put(DistributedSetCacheClientService.PORT, String.valueOf(port));
+        final MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup());
+        client.onEnabled(clientContext);
+
+        return client;
+    }
+
+    private static class StringSerializer implements Serializer<String> {
+
+        @Override
+        public void serialize(final String value, final OutputStream output) throws SerializationException, IOException {
+            output.write(value.getBytes(StandardCharsets.UTF_8));
+        }
+    }
+}