You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2022/01/25 16:03:54 UTC
[nifi] branch main updated: NIFI-9625 This closes #5711. Refactored Distributed Cache Server and Client Tests - Replaced TestServerAndClient with separate classes for Set Server and Map Server - Implemented before and after annotations for starting and stopping server instances NIFI-9625 Added check for cache directory existence before clean NIFI-9625 Updated Map and Set Cache Server Tests to use random port
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new b40b5a7 NIFI-9625 This closes #5711. Refactored Distributed Cache Server and Client Tests - Replaced TestServerAndClient with separate classes for Set Server and Map Server - Implemented before and after annotations for starting and stopping server instances NIFI-9625 Added check for cache directory existence before clean NIFI-9625 Updated Map and Set Cache Server Tests to use random port
b40b5a7 is described below
commit b40b5a7c96d973f08a04a8d382e8c9fbce9f758a
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Mon Jan 24 16:13:16 2022 -0600
NIFI-9625 This closes #5711. Refactored Distributed Cache Server and Client Tests
- Replaced TestServerAndClient with separate classes for Set Server and Map Server
- Implemented before and after annotations for starting and stopping server instances
NIFI-9625 Added check for cache directory existence before clean
NIFI-9625 Updated Map and Set Cache Server Tests to use random port
Signed-off-by: Joe Witt <jo...@apache.org>
---
.../cache/server/TestServerAndClient.java | 810 ---------------------
.../cache/server/map/DistributedMapCacheTest.java | 3 +-
.../server/map/DistributedMapCacheTlsTest.java | 3 +-
.../map/TestDistributedMapServerAndClient.java | 328 +++++++++
.../cache/server/set/DistributedSetCacheTest.java | 3 +-
.../set/TestDistributedSetServerAndClient.java | 269 +++++++
6 files changed, 603 insertions(+), 813 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
deleted file mode 100644
index 916908b..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
+++ /dev/null
@@ -1,810 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.ConnectException;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.lang3.SerializationException;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
-import org.apache.nifi.distributed.cache.client.Deserializer;
-import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
-import org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService;
-import org.apache.nifi.distributed.cache.client.Serializer;
-import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
-import org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer;
-import org.apache.nifi.distributed.cache.server.map.MapCacheServer;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.processor.Processor;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.remote.StandardVersionNegotiator;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.util.MockConfigurationContext;
-import org.apache.nifi.util.MockControllerServiceInitializationContext;
-import org.apache.nifi.util.MockPropertyValue;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.net.ssl.SSLContext;
-
-public class TestServerAndClient {
-
- private static final Logger LOGGER;
-
- static {
- System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
- System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
- System.setProperty("org.slf4j.simpleLogger.log.nifi.distributed.cache.server.AbstractCacheServer", "debug");
- System.setProperty("org.slf4j.simpleLogger.log.nifi.distributed.cache.client.DistributedMapCacheClientService", "debug");
- System.setProperty("org.slf4j.simpleLogger.log.nifi.distributed.cache.server.TestServerAndClient", "debug");
- System.setProperty("org.slf4j.simpleLogger.log.nifi.remote.io.socket.ssl.SSLSocketChannel", "trace");
- LOGGER = LoggerFactory.getLogger(TestServerAndClient.class);
- }
-
- @Test
- public void testNonPersistentSetServerAndClient() throws InitializationException, IOException {
- LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
- // Create server
- final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
- final DistributedSetCacheServer server = new SetServer();
- runner.addControllerService("server", server);
- runner.enableControllerService(server);
-
- final DistributedSetCacheClientService client = createClient(server.getPort());
- final Serializer<String> serializer = new StringSerializer();
- final boolean added = client.addIfAbsent("test", serializer);
- assertTrue(added);
-
- final boolean contains = client.contains("test", serializer);
- assertTrue(contains);
-
- final boolean addedAgain = client.addIfAbsent("test", serializer);
- assertFalse(addedAgain);
-
- final boolean removed = client.remove("test", serializer);
- assertTrue(removed);
-
- final boolean containedAfterRemove = client.contains("test", serializer);
- assertFalse(containedAfterRemove);
-
- server.shutdownServer();
- }
-
- @Test
- public void testPersistentSetServerAndClient() throws InitializationException, IOException {
- LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
-
- final File dataFile = new File("target/cache-data");
- deleteRecursively(dataFile);
-
- // Create server
- final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
- final DistributedSetCacheServer server = new SetServer();
- runner.addControllerService("server", server);
- runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
- runner.enableControllerService(server);
-
- DistributedSetCacheClientService client = createClient(server.getPort());
- final Serializer<String> serializer = new StringSerializer();
- final boolean added = client.addIfAbsent("test", serializer);
- final boolean added2 = client.addIfAbsent("test2", serializer);
- assertTrue(added);
- assertTrue(added2);
-
- final boolean contains = client.contains("test", serializer);
- final boolean contains2 = client.contains("test2", serializer);
- assertTrue(contains);
- assertTrue(contains2);
-
- final boolean addedAgain = client.addIfAbsent("test", serializer);
- assertFalse(addedAgain);
-
- final boolean removed = client.remove("test", serializer);
- assertTrue(removed);
-
- final boolean containedAfterRemove = client.contains("test", serializer);
- assertFalse(containedAfterRemove);
-
- client.close();
- server.shutdownServer();
-
- final DistributedSetCacheServer newServer = new SetServer();
- runner.addControllerService("server2", newServer);
- runner.setProperty(newServer, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
- runner.enableControllerService(newServer);
- client = createClient(newServer.getPort());
-
- assertFalse(client.contains("test", serializer));
- assertTrue(client.contains("test2", serializer));
-
- client.close();
- newServer.shutdownServer();
- }
-
- @Test
- public void testPersistentSetServerAndClientWithLFUEvictions() throws InitializationException, IOException {
- LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
- // Create server
- final File dataFile = new File("target/cache-data");
- deleteRecursively(dataFile);
-
- // Create server
- final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
- final DistributedSetCacheServer server = new SetServer();
- runner.addControllerService("server", server);
- runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
- runner.setProperty(server, DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3");
- runner.setProperty(server, DistributedSetCacheServer.EVICTION_POLICY, DistributedSetCacheServer.EVICTION_STRATEGY_LFU);
- runner.enableControllerService(server);
-
- DistributedSetCacheClientService client = createClient(server.getPort());
- final Serializer<String> serializer = new StringSerializer();
- final boolean added = client.addIfAbsent("test", serializer);
- waitABit();
- final boolean added2 = client.addIfAbsent("test2", serializer);
- waitABit();
- final boolean added3 = client.addIfAbsent("test3", serializer);
- waitABit();
- assertTrue(added);
- assertTrue(added2);
- assertTrue(added3);
-
- final boolean contains = client.contains("test", serializer);
- final boolean contains2 = client.contains("test2", serializer);
- assertTrue(contains);
- assertTrue(contains2);
-
- final boolean addedAgain = client.addIfAbsent("test", serializer);
- assertFalse(addedAgain);
-
- final boolean added4 = client.addIfAbsent("test4", serializer);
- assertTrue(added4);
-
- // ensure that added3 was evicted because it was used least frequently
- assertFalse(client.contains("test3", serializer));
-
- client.close();
- server.shutdownServer();
-
-
- final DistributedSetCacheServer newServer = new SetServer();
- runner.addControllerService("server2", newServer);
- runner.setProperty(newServer, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
- runner.enableControllerService(newServer);
- client = createClient(newServer.getPort());
-
- assertTrue(client.contains("test", serializer));
- assertTrue(client.contains("test2", serializer));
- assertFalse(client.contains("test3", serializer));
- assertTrue(client.contains("test4", serializer));
-
- client.close();
- newServer.shutdownServer();
- }
-
- @Test
- public void testPersistentMapServerAndClientWithLFUEvictions() throws InitializationException, IOException {
- LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
- // Create server
- final File dataFile = new File("target/cache-data");
- deleteRecursively(dataFile);
-
- // Create server
- final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
- final DistributedMapCacheServer server = new MapServer();
- runner.addControllerService("server", server);
- runner.setProperty(server, DistributedMapCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
- runner.setProperty(server, DistributedMapCacheServer.MAX_CACHE_ENTRIES, "3");
- runner.setProperty(server, DistributedMapCacheServer.EVICTION_POLICY, DistributedMapCacheServer.EVICTION_STRATEGY_LFU);
- runner.enableControllerService(server);
-
- DistributedMapCacheClientService client = createMapClient(server.getPort());
- final Serializer<String> serializer = new StringSerializer();
- final boolean added = client.putIfAbsent("test", "1", serializer, serializer);
- waitABit();
- final boolean added2 = client.putIfAbsent("test2", "2", serializer, serializer);
- waitABit();
- final boolean added3 = client.putIfAbsent("test3", "3", serializer, serializer);
- waitABit();
- assertTrue(added);
- assertTrue(added2);
- assertTrue(added3);
-
- final boolean contains = client.containsKey("test", serializer);
- final boolean contains2 = client.containsKey("test2", serializer);
- assertTrue(contains);
- assertTrue(contains2);
-
- final Deserializer<String> deserializer = new StringDeserializer();
- final Set<String> keys = client.keySet(deserializer);
- assertEquals(3, keys.size());
- assertTrue(keys.contains("test"));
- assertTrue(keys.contains("test2"));
- assertTrue(keys.contains("test3"));
-
- final boolean addedAgain = client.putIfAbsent("test", "1", serializer, serializer);
- assertFalse(addedAgain);
-
- final boolean added4 = client.putIfAbsent("test4", "4", serializer, serializer);
- assertTrue(added4);
-
- // ensure that added3 was evicted because it was used least frequently
- assertFalse(client.containsKey("test3", serializer));
-
- client.close();
- server.shutdownServer();
-
- final DistributedMapCacheServer newServer = new MapServer();
- runner.addControllerService("server2", newServer);
- runner.setProperty(newServer, DistributedMapCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
- runner.enableControllerService(newServer);
- client = createMapClient(newServer.getPort());
-
- assertTrue(client.containsKey("test", serializer));
- assertTrue(client.containsKey("test2", serializer));
- assertFalse(client.containsKey("test3", serializer));
- assertTrue(client.containsKey("test4", serializer));
-
- // Test removeByPattern, the first two should be removed and the last should remain
- client.put("test.1", "1", serializer, serializer);
- client.put("test.2", "2", serializer, serializer);
- client.put("test3", "2", serializer, serializer);
- final long removedTwo = client.removeByPattern("test\\..*");
- assertEquals(2L, removedTwo);
- assertFalse(client.containsKey("test.1", serializer));
- assertFalse(client.containsKey("test.2", serializer));
- assertTrue(client.containsKey("test3", serializer));
-
- // test removeByPatternAndGet
- client.put("test.1", "1", serializer, serializer);
- client.put("test.2", "2", serializer, serializer);
- Map<String,String> removed = client.removeByPatternAndGet("test\\..*", deserializer, deserializer);
- assertEquals(2, removed.size());
- assertTrue(removed.containsKey("test.1"));
- assertTrue(removed.containsKey("test.2"));
- assertFalse(client.containsKey("test.1", serializer));
- assertFalse(client.containsKey("test.2", serializer));
- assertTrue(client.containsKey("test3", serializer));
- removed = client.removeByPatternAndGet("test\\..*", deserializer, deserializer);
- assertEquals(0, removed.size());
-
- client.close();
- newServer.shutdownServer();
- }
-
- @Test
- public void testPersistentSetServerAndClientWithFIFOEvictions() throws InitializationException, IOException {
- LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
-
- final File dataFile = new File("target/cache-data");
- deleteRecursively(dataFile);
-
- // Create server
- final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
- final DistributedSetCacheServer server = new SetServer();
- runner.addControllerService("server", server);
- runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
- runner.setProperty(server, DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3");
- runner.setProperty(server, DistributedSetCacheServer.EVICTION_POLICY, DistributedSetCacheServer.EVICTION_STRATEGY_FIFO);
- runner.enableControllerService(server);
-
- DistributedSetCacheClientService client = createClient(server.getPort());
- final Serializer<String> serializer = new StringSerializer();
-
- // add 3 entries to the cache. But, if we add too fast, we'll have the same millisecond
- // for the entry time so we don't know which entry will be evicted. So we wait a few millis in between
- final boolean added = client.addIfAbsent("test", serializer);
- waitABit();
- final boolean added2 = client.addIfAbsent("test2", serializer);
- waitABit();
- final boolean added3 = client.addIfAbsent("test3", serializer);
- waitABit();
-
- assertTrue(added);
- assertTrue(added2);
- assertTrue(added3);
-
- final boolean contains = client.contains("test", serializer);
- final boolean contains2 = client.contains("test2", serializer);
- assertTrue(contains);
- assertTrue(contains2);
-
- final boolean addedAgain = client.addIfAbsent("test", serializer);
- assertFalse(addedAgain);
-
- final boolean added4 = client.addIfAbsent("test4", serializer);
- assertTrue(added4);
-
- // ensure that added3 was evicted because it was used least frequently
- assertFalse(client.contains("test", serializer));
- assertTrue(client.contains("test3", serializer));
-
- client.close();
- server.shutdownServer();
-
- final DistributedSetCacheServer newServer = new SetServer();
- runner.addControllerService("server2", newServer);
- runner.setProperty(newServer, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
- runner.setProperty(newServer, DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3");
- runner.setProperty(newServer, DistributedSetCacheServer.EVICTION_POLICY, DistributedSetCacheServer.EVICTION_STRATEGY_FIFO);
- runner.enableControllerService(newServer);
-
- client = createClient(newServer.getPort());
- assertFalse(client.contains("test", serializer));
- assertTrue(client.contains("test2", serializer));
- assertTrue(client.contains("test3", serializer));
- assertTrue(client.contains("test4", serializer));
-
- client.close();
- newServer.shutdownServer();
- }
-
- @Test
- public void testNonPersistentMapServerAndClient() throws InitializationException, IOException, InterruptedException {
- LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
-
- // Create server
- final DistributedMapCacheServer server = new MapServer();
- final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
- runner.addControllerService("server", server);
- runner.enableControllerService(server);
-
- DistributedMapCacheClientService client = new DistributedMapCacheClientService();
- runner.addControllerService("client", client);
- runner.setProperty(client, DistributedMapCacheClientService.HOSTNAME, "localhost");
- runner.setProperty(client, DistributedMapCacheClientService.PORT, String.valueOf(server.getPort()));
- runner.enableControllerService(client);
-
- final Serializer<String> valueSerializer = new StringSerializer();
- final Serializer<String> keySerializer = new StringSerializer();
- final Deserializer<String> deserializer = new StringDeserializer();
-
- final String original = client.getAndPutIfAbsent("testKey", "test", keySerializer, valueSerializer, deserializer);
- assertEquals(null, original);
- LOGGER.debug("end getAndPutIfAbsent");
-
- final boolean contains = client.containsKey("testKey", keySerializer);
- assertTrue(contains);
- LOGGER.debug("end containsKey");
-
- final boolean added = client.putIfAbsent("testKey", "test", keySerializer, valueSerializer);
- assertFalse(added);
- LOGGER.debug("end putIfAbsent");
-
- final String originalAfterPut = client.getAndPutIfAbsent("testKey", "test", keySerializer, valueSerializer, deserializer);
- assertEquals("test", originalAfterPut);
- LOGGER.debug("end getAndPutIfAbsent");
-
- final boolean removed = client.remove("testKey", keySerializer);
- assertTrue(removed);
- LOGGER.debug("end remove");
-
- client.put("testKey", "testValue", keySerializer, valueSerializer);
- assertTrue(client.containsKey("testKey", keySerializer));
- String removedValue = client.removeAndGet("testKey", keySerializer, deserializer);
- assertEquals("testValue", removedValue);
- removedValue = client.removeAndGet("testKey", keySerializer, deserializer);
- assertNull(removedValue);
-
- final Set<String> keys = client.keySet(deserializer);
- assertEquals(0, keys.size());
-
- // Test removeByPattern, the first two should be removed and the last should remain
- client.put("test.1", "1", keySerializer, keySerializer);
- client.put("test.2", "2", keySerializer, keySerializer);
- client.put("test3", "2", keySerializer, keySerializer);
- final long removedTwo = client.removeByPattern("test\\..*");
- assertEquals(2L, removedTwo);
- assertFalse(client.containsKey("test.1", keySerializer));
- assertFalse(client.containsKey("test.2", keySerializer));
- assertTrue(client.containsKey("test3", keySerializer));
-
- final boolean containedAfterRemove = client.containsKey("testKey", keySerializer);
- assertFalse(containedAfterRemove);
-
- client.putIfAbsent("testKey", "test", keySerializer, valueSerializer);
- runner.disableControllerService(client);
- try {
- client.containsKey("testKey", keySerializer);
- fail("Should be closed and not accessible");
- } catch (final Exception e) {
-
- }
-
- DistributedMapCacheClientService client2 = new DistributedMapCacheClientService();
- runner.addControllerService("client2", client2);
- runner.setProperty(client2, DistributedMapCacheClientService.HOSTNAME, "localhost");
- runner.setProperty(client2, DistributedMapCacheClientService.PORT, String.valueOf(server.getPort()));
- runner.enableControllerService(client2);
-
- assertFalse(client2.putIfAbsent("testKey", "test", keySerializer, valueSerializer));
- assertTrue(client2.containsKey("testKey", keySerializer));
- server.shutdownServer();
- Thread.sleep(1000);
- try {
- client2.containsKey("testKey", keySerializer);
- fail("Should have blown exception!");
- } catch (final ConnectException e) {
- client2 = null;
- }
- LOGGER.debug("end testNonPersistentMapServerAndClient");
- }
-
- @Test
- public void testClientTermination() throws InitializationException, IOException, InterruptedException {
- LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
- // Create server
- final DistributedMapCacheServer server = new MapServer();
- final MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server, "server");
- server.initialize(serverInitContext);
-
- final Map<PropertyDescriptor, String> serverProperties = new HashMap<>();
- final MockConfigurationContext serverContext = new MockConfigurationContext(serverProperties, serverInitContext.getControllerServiceLookup());
- server.startServer(serverContext);
-
- DistributedMapCacheClientService client = new DistributedMapCacheClientService();
- MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client");
- client.initialize(clientInitContext);
-
- final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
- clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost");
- clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs");
- MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup());
- client.onEnabled(clientContext);
- final Serializer<String> valueSerializer = new StringSerializer();
- final Serializer<String> keySerializer = new StringSerializer();
- final Deserializer<String> deserializer = new StringDeserializer();
-
- final String original = client.getAndPutIfAbsent("testKey", "test", keySerializer, valueSerializer, deserializer);
- assertEquals(null, original);
-
- final boolean contains = client.containsKey("testKey", keySerializer);
- assertTrue(contains);
-
- final boolean added = client.putIfAbsent("testKey", "test", keySerializer, valueSerializer);
- assertFalse(added);
-
- final String originalAfterPut = client.getAndPutIfAbsent("testKey", "test", keySerializer, valueSerializer, deserializer);
- assertEquals("test", originalAfterPut);
-
- final boolean removed = client.remove("testKey", keySerializer);
- assertTrue(removed);
-
- final boolean containedAfterRemove = client.containsKey("testKey", keySerializer);
- assertFalse(containedAfterRemove);
-
- client = null;
- clientInitContext = null;
- clientContext = null;
- Thread.sleep(2000);
- System.gc();
- server.shutdownServer();
- }
-
- @Test
- public void testOptimisticLock() throws Exception {
- LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
-
- // Create server
- final DistributedMapCacheServer server = new MapServer();
- final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
- runner.addControllerService("server", server);
- runner.enableControllerService(server);
-
- DistributedMapCacheClientService client1 = new DistributedMapCacheClientService();
- MockControllerServiceInitializationContext clientInitContext1 = new MockControllerServiceInitializationContext(client1, "client1");
- client1.initialize(clientInitContext1);
-
- DistributedMapCacheClientService client2 = new DistributedMapCacheClientService();
- MockControllerServiceInitializationContext clientInitContext2 = new MockControllerServiceInitializationContext(client2, "client2");
- client2.initialize(clientInitContext2);
-
- final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
- clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost");
- clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(server.getPort()));
- clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs");
-
- MockConfigurationContext clientContext1 = new MockConfigurationContext(clientProperties, clientInitContext1.getControllerServiceLookup());
- client1.onEnabled(clientContext1);
- MockConfigurationContext clientContext2 = new MockConfigurationContext(clientProperties, clientInitContext2.getControllerServiceLookup());
- client2.onEnabled(clientContext2);
-
- final Serializer<String> stringSerializer = new StringSerializer();
- final Deserializer<String> stringDeserializer = new StringDeserializer();
-
- final String key = "test-optimistic-lock";
-
- // Ensure there's no existing key
- assertFalse(client1.containsKey(key, stringSerializer));
- assertNull(client1.fetch(key, stringSerializer, stringDeserializer));
-
- // Client 1 inserts the key.
- client1.put(key, "valueC1-0", stringSerializer, stringSerializer);
-
- // Client 1 and 2 fetch the key
- AtomicCacheEntry<String, String, Long> c1 = client1.fetch(key, stringSerializer, stringDeserializer);
- AtomicCacheEntry<String, String, Long> c2 = client2.fetch(key, stringSerializer, stringDeserializer);
- assertEquals(new Long(0), c1.getRevision().orElse(0L));
- assertEquals("valueC1-0", c1.getValue());
- assertEquals(new Long(0), c2.getRevision().orElse(0L));
- assertEquals("valueC1-0", c2.getValue());
-
- // Client 1 replace
- c1.setValue("valueC1-1");
- boolean c1Result = client1.replace(c1, stringSerializer, stringSerializer);
- assertTrue("C1 should be able to replace the key", c1Result);
- // Client 2 replace with the old revision
- c2.setValue("valueC2-1");
- boolean c2Result = client2.replace(c2, stringSerializer, stringSerializer);
- assertFalse("C2 shouldn't be able to replace the key", c2Result);
-
- // Client 2 fetch the key again
- c2 = client2.fetch(key, stringSerializer, stringDeserializer);
- assertEquals("valueC1-1", c2.getValue());
- assertEquals(new Long(1), c2.getRevision().orElse(0L));
-
- // Now, Client 2 knows the correct revision so it can replace the key
- c2.setValue("valueC2-2");
- c2Result = client2.replace(c2, stringSerializer, stringSerializer);
- assertTrue("C2 should be able to replace the key", c2Result);
-
- // Assert the cache
- c2 = client2.fetch(key, stringSerializer, stringDeserializer);
- assertEquals("valueC2-2", c2.getValue());
- assertEquals(new Long(2), c2.getRevision().orElse(0L));
-
- client1.close();
- client2.close();
- server.shutdownServer();
- }
-
- @Test
- public void testBackwardCompatibility() throws Exception {
- LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
-
- final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
-
- // Create a server that only supports protocol version 1.
- final DistributedMapCacheServer server = new MapServer() {
- @Override
- protected MapCacheServer createMapCacheServer(int port, int maxSize, SSLContext sslContext, EvictionPolicy evictionPolicy, File persistenceDir, int maxReadSize) throws IOException {
- return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir, maxReadSize) {
- @Override
- protected StandardVersionNegotiator getVersionNegotiator() {
- return new StandardVersionNegotiator(1);
- }
- };
- }
- };
- runner.addControllerService("server", server);
- runner.enableControllerService(server);
-
- DistributedMapCacheClientService client = new DistributedMapCacheClientService();
- MockControllerServiceInitializationContext clientInitContext1 = new MockControllerServiceInitializationContext(client, "client");
- client.initialize(clientInitContext1);
-
- final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
- clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost");
- clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(server.getPort()));
- clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs");
-
- MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext1.getControllerServiceLookup());
- client.onEnabled(clientContext);
-
- final Serializer<String> stringSerializer = new StringSerializer();
- final Deserializer<String> stringDeserializer = new StringDeserializer();
-
- final String key = "test-backward-compatibility";
-
- // Version 1 operations should work
- client.put(key, "value1", stringSerializer, stringSerializer);
- assertEquals("value1", client.get(key, stringSerializer, stringDeserializer));
-
- assertTrue(client.containsKey(key, stringSerializer));
-
- try {
- client.fetch(key, stringSerializer, stringDeserializer);
- fail("Version 2 operations should NOT work.");
- } catch (UnsupportedOperationException e) {
- }
-
- try {
- AtomicCacheEntry<String,String,Long> entry = new AtomicCacheEntry<>(key, "value2", 0L);
- client.replace(entry, stringSerializer, stringSerializer);
- fail("Version 2 operations should NOT work.");
- } catch (UnsupportedOperationException e) {
- }
-
- try {
- Set<String> keys = client.keySet(stringDeserializer);
- fail("Version 3 operations should NOT work.");
- } catch (UnsupportedOperationException e) {
- }
-
- try {
- String removed = client.removeAndGet("v.*", stringSerializer, stringDeserializer);
- fail("Version 3 operations should NOT work.");
- } catch (UnsupportedOperationException e) {
- }
-
- try {
- Map<String, String> removed = client.removeByPatternAndGet("v.*", stringDeserializer, stringDeserializer);
- fail("Version 3 operations should NOT work.");
- } catch (UnsupportedOperationException e) {
- }
- client.close();
- server.shutdownServer();
- }
-
- @Test
- public void testLimitServiceReadSizeMap() throws InitializationException, IOException {
- final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
- final DistributedMapCacheServer server = new MapServer();
- runner.addControllerService("server", server);
- runner.enableControllerService(server);
-
- final DistributedMapCacheClientService client = createMapClient(server.getPort());
- final Serializer<String> serializer = new StringSerializer();
-
- final String key = "key";
- final int maxReadSize = new MockPropertyValue(DistributedCacheServer.MAX_READ_SIZE.getDefaultValue()).asDataSize(DataUnit.B).intValue();
- final int belowThreshold = maxReadSize / key.length();
- final int aboveThreshold = belowThreshold + 1;
- final String keyBelowThreshold = StringUtils.repeat(key, belowThreshold);
- final String keyAboveThreshold = StringUtils.repeat(key, aboveThreshold);
- assertFalse(client.containsKey(keyBelowThreshold, serializer));
- assertThrows(IOException.class, () -> client.containsKey(keyAboveThreshold, serializer));
- }
-
- @Test
- public void testLimitServiceReadSizeSet() throws InitializationException, IOException {
- final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
- final DistributedSetCacheServer server = new SetServer();
- runner.addControllerService("server", server);
- runner.enableControllerService(server);
-
- final DistributedSetCacheClientService client = createClient(server.getPort());
- final Serializer<String> serializer = new StringSerializer();
-
- final String value = "value";
- final int maxReadSize = new MockPropertyValue(DistributedCacheServer.MAX_READ_SIZE.getDefaultValue()).asDataSize(DataUnit.B).intValue();
- final int belowThreshold = maxReadSize / value.length();
- final int aboveThreshold = belowThreshold + 1;
- final String valueBelowThreshold = StringUtils.repeat(value, belowThreshold);
- final String valueAboveThreshold = StringUtils.repeat(value, aboveThreshold);
- assertFalse(client.contains(valueBelowThreshold, serializer));
- assertThrows(IOException.class, () -> client.contains(valueAboveThreshold, serializer));
- }
-
- private void waitABit() {
- try {
- Thread.sleep(10L);
- } catch (final InterruptedException e) {
- }
- }
-
- private DistributedSetCacheClientService createClient(final int port) throws InitializationException {
- final DistributedSetCacheClientService client = new DistributedSetCacheClientService();
- final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client");
- client.initialize(clientInitContext);
-
- final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
- clientProperties.put(DistributedSetCacheClientService.HOSTNAME, "localhost");
- clientProperties.put(DistributedSetCacheClientService.PORT, String.valueOf(port));
- final MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup());
- client.onEnabled(clientContext);
-
- return client;
- }
-
- private DistributedMapCacheClientService createMapClient(final int port) throws InitializationException {
- final DistributedMapCacheClientService client = new DistributedMapCacheClientService();
- final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client");
- client.initialize(clientInitContext);
-
- final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
- clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost");
- clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(port));
- final MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup());
- client.onEnabled(clientContext);
-
- return client;
- }
-
- private static class StringSerializer implements Serializer<String> {
-
- @Override
- public void serialize(final String value, final OutputStream output) throws SerializationException, IOException {
- output.write(value.getBytes(StandardCharsets.UTF_8));
- }
- }
-
- private static class StringDeserializer implements Deserializer<String> {
-
- @Override
- public String deserialize(final byte[] input) throws DeserializationException, IOException {
- return input.length == 0 ? null : new String(input, StandardCharsets.UTF_8);
- }
- }
-
- private static void deleteRecursively(final File dataFile) throws IOException {
- if (dataFile == null || !dataFile.exists()) {
- return;
- }
-
- final File[] children = dataFile.listFiles();
- for (final File child : children) {
- if (child.isDirectory()) {
- deleteRecursively(child);
- } else {
- for (int i = 0; i < 100 && child.exists(); i++) {
- child.delete();
- }
-
- if (child.exists()) {
- throw new IOException("Could not delete " + dataFile.getAbsolutePath());
- }
- }
- }
- }
-
- private static List<PropertyDescriptor> replacePortDescriptor(final List<PropertyDescriptor> descriptors) {
- descriptors.remove(DistributedCacheServer.PORT);
- descriptors.add(new PropertyDescriptor.Builder()
- .name("Port")
- .description("The port to listen on for incoming connections")
- .required(true)
- .addValidator(StandardValidators.createLongValidator(0L, 65535L, true))
- .defaultValue("0")
- .build());
- return descriptors;
- }
-
- private static class SetServer extends DistributedSetCacheServer {
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return replacePortDescriptor(super.getSupportedPropertyDescriptors());
- }
- }
-
- private static class MapServer extends DistributedMapCacheServer {
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return replacePortDescriptor(super.getSupportedPropertyDescriptors());
- }
- }
-}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTest.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTest.java
index 254f925..42e9f56 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTest.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTest.java
@@ -23,6 +23,7 @@ import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
import org.apache.nifi.processor.Processor;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.AfterClass;
@@ -60,7 +61,7 @@ public class DistributedMapCacheTest {
@BeforeClass
public static void beforeClass() throws Exception {
- final String port = DistributedMapCacheServer.PORT.getDefaultValue();
+ final String port = Integer.toString(NetworkUtils.getAvailableTcpPort());
runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
server = new DistributedMapCacheServer();
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTlsTest.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTlsTest.java
index 2608424..9fddfaf 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTlsTest.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTlsTest.java
@@ -22,6 +22,7 @@ import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.processor.Processor;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
import org.apache.nifi.security.util.TlsConfiguration;
@@ -62,7 +63,7 @@ public class DistributedMapCacheTlsTest {
@BeforeClass
public static void beforeClass() throws Exception {
- final String port = DistributedMapCacheServer.PORT.getDefaultValue();
+ final String port = Integer.toString(NetworkUtils.getAvailableTcpPort());
runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
sslContextService = createSslContextService();
runner.addControllerService(sslContextService.getIdentifier(), sslContextService);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java
new file mode 100644
index 0000000..dc2d0ab
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.SerializationException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
+import org.apache.nifi.distributed.cache.server.DistributedCacheServer;
+import org.apache.nifi.distributed.cache.server.EvictionPolicy;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockConfigurationContext;
+import org.apache.nifi.util.MockControllerServiceInitializationContext;
+import org.apache.nifi.util.MockPropertyValue;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import javax.net.ssl.SSLContext;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestDistributedMapServerAndClient {
+
+ private final File dataFile = new File("target/cache-data");
+
+ private TestRunner runner;
+
+ private DistributedMapCacheServer server;
+
+ @BeforeEach
+ public void setRunner() throws InitializationException, IOException {
+ if (dataFile.exists()) {
+ FileUtils.cleanDirectory(dataFile);
+ }
+
+ runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
+
+ server = new DistributedMapCacheServer();
+ runner.addControllerService("server", server);
+
+ final int port = NetworkUtils.getAvailableTcpPort();
+ runner.setProperty(server, DistributedMapCacheServer.PORT, Integer.toString(port));
+ }
+
+ @AfterEach
+ public void shutdownServer() throws IOException {
+ server.shutdownServer();
+ }
+
+ @Test
+ public void testNonPersistentMapServerAndClient() throws InitializationException, IOException {
+ runner.enableControllerService(server);
+
+ DistributedMapCacheClientService client = new DistributedMapCacheClientService();
+ try {
+ runner.addControllerService("client", client);
+ runner.setProperty(client, DistributedMapCacheClientService.HOSTNAME, "localhost");
+ runner.setProperty(client, DistributedMapCacheClientService.PORT, String.valueOf(server.getPort()));
+ runner.enableControllerService(client);
+
+ final Serializer<String> valueSerializer = new StringSerializer();
+ final Serializer<String> keySerializer = new StringSerializer();
+ final Deserializer<String> deserializer = new StringDeserializer();
+
+ final String original = client.getAndPutIfAbsent("testKey", "test", keySerializer, valueSerializer, deserializer);
+ assertNull(original);
+
+ final boolean contains = client.containsKey("testKey", keySerializer);
+ assertTrue(contains);
+
+ final boolean added = client.putIfAbsent("testKey", "test", keySerializer, valueSerializer);
+ assertFalse(added);
+
+ final String originalAfterPut = client.getAndPutIfAbsent("testKey", "test", keySerializer, valueSerializer, deserializer);
+ assertEquals("test", originalAfterPut);
+
+ final boolean removed = client.remove("testKey", keySerializer);
+ assertTrue(removed);
+
+ client.put("testKey", "testValue", keySerializer, valueSerializer);
+ assertTrue(client.containsKey("testKey", keySerializer));
+ String removedValue = client.removeAndGet("testKey", keySerializer, deserializer);
+ assertEquals("testValue", removedValue);
+ removedValue = client.removeAndGet("testKey", keySerializer, deserializer);
+ assertNull(removedValue);
+
+ final Set<String> keys = client.keySet(deserializer);
+ assertEquals(0, keys.size());
+
+ // Test removeByPattern, the first two should be removed and the last should remain
+ client.put("test.1", "1", keySerializer, keySerializer);
+ client.put("test.2", "2", keySerializer, keySerializer);
+ client.put("test3", "2", keySerializer, keySerializer);
+ final long removedTwo = client.removeByPattern("test\\..*");
+ assertEquals(2L, removedTwo);
+ assertFalse(client.containsKey("test.1", keySerializer));
+ assertFalse(client.containsKey("test.2", keySerializer));
+ assertTrue(client.containsKey("test3", keySerializer));
+
+ final boolean containedAfterRemove = client.containsKey("testKey", keySerializer);
+ assertFalse(containedAfterRemove);
+
+ client.putIfAbsent("testKey", "test", keySerializer, valueSerializer);
+ runner.disableControllerService(client);
+
+ assertThrows(Exception.class, () -> client.containsKey("testKey", keySerializer));
+ } finally {
+ client.close();
+ }
+ }
+
+ @Test
+ public void testOptimisticLock() throws Exception {
+ runner.enableControllerService(server);
+
+ DistributedMapCacheClientService client1 = new DistributedMapCacheClientService();
+ MockControllerServiceInitializationContext clientInitContext1 = new MockControllerServiceInitializationContext(client1, "client1");
+ client1.initialize(clientInitContext1);
+
+ DistributedMapCacheClientService client2 = new DistributedMapCacheClientService();
+ MockControllerServiceInitializationContext clientInitContext2 = new MockControllerServiceInitializationContext(client2, "client2");
+ client2.initialize(clientInitContext2);
+
+ final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
+ clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost");
+ clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(server.getPort()));
+ clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs");
+
+ MockConfigurationContext clientContext1 = new MockConfigurationContext(clientProperties, clientInitContext1.getControllerServiceLookup());
+ client1.onEnabled(clientContext1);
+ MockConfigurationContext clientContext2 = new MockConfigurationContext(clientProperties, clientInitContext2.getControllerServiceLookup());
+ client2.onEnabled(clientContext2);
+
+ try {
+ final Serializer<String> stringSerializer = new StringSerializer();
+ final Deserializer<String> stringDeserializer = new StringDeserializer();
+
+ final String key = "test-optimistic-lock";
+
+ // Ensure there's no existing key
+ assertFalse(client1.containsKey(key, stringSerializer));
+ assertNull(client1.fetch(key, stringSerializer, stringDeserializer));
+
+ // Client 1 inserts the key.
+ client1.put(key, "valueC1-0", stringSerializer, stringSerializer);
+
+ // Client 1 and 2 fetch the key
+ AtomicCacheEntry<String, String, Long> c1 = client1.fetch(key, stringSerializer, stringDeserializer);
+ AtomicCacheEntry<String, String, Long> c2 = client2.fetch(key, stringSerializer, stringDeserializer);
+ assertEquals(new Long(0), c1.getRevision().orElse(0L));
+ assertEquals("valueC1-0", c1.getValue());
+ assertEquals(new Long(0), c2.getRevision().orElse(0L));
+ assertEquals("valueC1-0", c2.getValue());
+
+ // Client 1 replace
+ c1.setValue("valueC1-1");
+ boolean c1Result = client1.replace(c1, stringSerializer, stringSerializer);
+ assertTrue(c1Result, "C1 should be able to replace the key");
+ // Client 2 replace with the old revision
+ c2.setValue("valueC2-1");
+ boolean c2Result = client2.replace(c2, stringSerializer, stringSerializer);
+ assertFalse(c2Result, "C2 shouldn't be able to replace the key");
+
+ // Client 2 fetch the key again
+ c2 = client2.fetch(key, stringSerializer, stringDeserializer);
+ assertEquals("valueC1-1", c2.getValue());
+ assertEquals(new Long(1), c2.getRevision().orElse(0L));
+
+ // Now, Client 2 knows the correct revision so it can replace the key
+ c2.setValue("valueC2-2");
+ c2Result = client2.replace(c2, stringSerializer, stringSerializer);
+ assertTrue(c2Result, "C2 should be able to replace the key");
+
+ // Assert the cache
+ c2 = client2.fetch(key, stringSerializer, stringDeserializer);
+ assertEquals("valueC2-2", c2.getValue());
+ assertEquals(new Long(2), c2.getRevision().orElse(0L));
+ } finally {
+ client1.close();
+ client2.close();
+ }
+ }
+
+ @Test
+ public void testBackwardCompatibility() throws Exception {
+ // Create a server that only supports protocol version 1.
+ server = new DistributedMapCacheServer() {
+ @Override
+ protected MapCacheServer createMapCacheServer(int port, int maxSize, SSLContext sslContext, EvictionPolicy evictionPolicy, File persistenceDir, int maxReadSize) throws IOException {
+ return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir, maxReadSize) {
+ @Override
+ protected StandardVersionNegotiator getVersionNegotiator() {
+ return new StandardVersionNegotiator(1);
+ }
+ };
+ }
+ };
+ runner.addControllerService("server", server);
+ final int port = NetworkUtils.getAvailableTcpPort();
+ runner.setProperty(server, DistributedMapCacheServer.PORT, Integer.toString(port));
+ runner.enableControllerService(server);
+
+ DistributedMapCacheClientService client = new DistributedMapCacheClientService();
+ MockControllerServiceInitializationContext clientInitContext1 = new MockControllerServiceInitializationContext(client, "client");
+ client.initialize(clientInitContext1);
+
+ final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
+ clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost");
+ clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(server.getPort()));
+ clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs");
+
+ MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext1.getControllerServiceLookup());
+ client.onEnabled(clientContext);
+
+ try {
+ final Serializer<String> stringSerializer = new StringSerializer();
+ final Deserializer<String> stringDeserializer = new StringDeserializer();
+
+ final String key = "test-backward-compatibility";
+
+ // Version 1 operations should work
+ client.put(key, "value1", stringSerializer, stringSerializer);
+ assertEquals("value1", client.get(key, stringSerializer, stringDeserializer));
+
+ assertTrue(client.containsKey(key, stringSerializer));
+
+ assertThrows(UnsupportedOperationException.class, () -> client.fetch(key, stringSerializer, stringDeserializer));
+
+ AtomicCacheEntry<String, String, Long> entry = new AtomicCacheEntry<>(key, "value2", 0L);
+ assertThrows(UnsupportedOperationException.class, () -> client.replace(entry, stringSerializer, stringSerializer));
+
+ assertThrows(UnsupportedOperationException.class, () -> client.keySet(stringDeserializer));
+ assertThrows(UnsupportedOperationException.class, () -> client.removeAndGet("v.*", stringSerializer, stringDeserializer));
+ assertThrows(UnsupportedOperationException.class, () ->client.removeByPatternAndGet("v.*", stringDeserializer, stringDeserializer));
+ } finally {
+ client.close();
+ }
+ }
+
+ @Test
+ public void testLimitServiceReadSize() throws InitializationException, IOException {
+ runner.enableControllerService(server);
+
+ final DistributedMapCacheClientService client = createClient(server.getPort());
+ try {
+ final Serializer<String> serializer = new StringSerializer();
+
+ final String value = "value";
+ final int maxReadSize = new MockPropertyValue(DistributedCacheServer.MAX_READ_SIZE.getDefaultValue()).asDataSize(DataUnit.B).intValue();
+ final int belowThreshold = maxReadSize / value.length();
+ final int aboveThreshold = belowThreshold + 1;
+ final String valueBelowThreshold = StringUtils.repeat(value, belowThreshold);
+ final String valueAboveThreshold = StringUtils.repeat(value, aboveThreshold);
+ assertFalse(client.containsKey(valueBelowThreshold, serializer));
+ assertThrows(IOException.class, () -> client.containsKey(valueAboveThreshold, serializer));
+ } finally {
+ client.close();
+ }
+ }
+
+ private DistributedMapCacheClientService createClient(final int port) throws InitializationException {
+ final DistributedMapCacheClientService client = new DistributedMapCacheClientService();
+ final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client");
+ client.initialize(clientInitContext);
+
+ final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
+ clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost");
+ clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(port));
+ final MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup());
+ client.onEnabled(clientContext);
+
+ return client;
+ }
+
+ private static class StringSerializer implements Serializer<String> {
+
+ @Override
+ public void serialize(final String value, final OutputStream output) throws SerializationException, IOException {
+ output.write(value.getBytes(StandardCharsets.UTF_8));
+ }
+ }
+
+ private static class StringDeserializer implements Deserializer<String> {
+
+ @Override
+ public String deserialize(final byte[] input) throws DeserializationException {
+ return input.length == 0 ? null : new String(input, StandardCharsets.UTF_8);
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/DistributedSetCacheTest.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/DistributedSetCacheTest.java
index 7580555..a5b6342 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/DistributedSetCacheTest.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/DistributedSetCacheTest.java
@@ -21,6 +21,7 @@ import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService;
import org.apache.nifi.distributed.cache.server.DistributedSetCacheServer;
import org.apache.nifi.processor.Processor;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.AfterClass;
@@ -51,7 +52,7 @@ public class DistributedSetCacheTest {
@BeforeClass
public static void beforeClass() throws Exception {
- final String port = DistributedSetCacheServer.PORT.getDefaultValue();
+ final String port = Integer.toString(NetworkUtils.getAvailableTcpPort());
runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
server = new DistributedSetCacheServer();
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/TestDistributedSetServerAndClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/TestDistributedSetServerAndClient.java
new file mode 100644
index 0000000..6484148
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/TestDistributedSetServerAndClient.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.set;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.SerializationException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.distributed.cache.server.DistributedCacheServer;
+import org.apache.nifi.distributed.cache.server.DistributedSetCacheServer;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockConfigurationContext;
+import org.apache.nifi.util.MockControllerServiceInitializationContext;
+import org.apache.nifi.util.MockPropertyValue;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.mockito.Mockito;
+
+public class TestDistributedSetServerAndClient {
+
+ private final File dataFile = new File("target/cache-data");
+
+ private TestRunner runner;
+
+ private DistributedSetCacheServer server;
+
+ @BeforeEach
+ public void setRunner() throws InitializationException, IOException {
+ if (dataFile.exists()) {
+ FileUtils.cleanDirectory(dataFile);
+ }
+
+ runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
+
+ server = new DistributedSetCacheServer();
+ runner.addControllerService("server", server);
+
+ final int port = NetworkUtils.getAvailableTcpPort();
+ runner.setProperty(server, DistributedSetCacheServer.PORT, Integer.toString(port));
+ }
+
+ @AfterEach
+ public void shutdownServer() throws IOException {
+ server.shutdownServer();
+ }
+
+ @Test
+ public void testNonPersistentSetServerAndClient() throws InitializationException, IOException {
+ runner.enableControllerService(server);
+
+ final DistributedSetCacheClientService client = createClient(server.getPort());
+ try {
+ final Serializer<String> serializer = new StringSerializer();
+ final boolean added = client.addIfAbsent("test", serializer);
+ assertTrue(added);
+
+ final boolean contains = client.contains("test", serializer);
+ assertTrue(contains);
+
+ final boolean addedAgain = client.addIfAbsent("test", serializer);
+ assertFalse(addedAgain);
+
+ final boolean removed = client.remove("test", serializer);
+ assertTrue(removed);
+
+ final boolean containedAfterRemove = client.contains("test", serializer);
+ assertFalse(containedAfterRemove);
+ } finally {
+ client.close();
+ }
+ }
+
+ @Test
+ public void testPersistentSetServerAndClient() throws InitializationException, IOException {
+ runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
+ runner.enableControllerService(server);
+
+ final DistributedSetCacheClientService client = createClient(server.getPort());
+ try {
+ final Serializer<String> serializer = new StringSerializer();
+ final boolean added = client.addIfAbsent("test", serializer);
+ final boolean added2 = client.addIfAbsent("test2", serializer);
+ assertTrue(added);
+ assertTrue(added2);
+
+ final boolean contains = client.contains("test", serializer);
+ final boolean contains2 = client.contains("test2", serializer);
+ assertTrue(contains);
+ assertTrue(contains2);
+
+ final boolean addedAgain = client.addIfAbsent("test", serializer);
+ assertFalse(addedAgain);
+
+ final boolean removed = client.remove("test", serializer);
+ assertTrue(removed);
+
+ final boolean containedAfterRemove = client.contains("test", serializer);
+ assertFalse(containedAfterRemove);
+ } finally {
+ client.close();
+ }
+ }
+
+ @Test
+ public void testPersistentSetServerAndClientWithLFUEvictions() throws InitializationException, IOException {
+ runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
+ runner.setProperty(server, DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3");
+ runner.setProperty(server, DistributedSetCacheServer.EVICTION_POLICY, DistributedSetCacheServer.EVICTION_STRATEGY_LFU);
+ runner.enableControllerService(server);
+
+ final DistributedSetCacheClientService client = createClient(server.getPort());
+ try {
+ final Serializer<String> serializer = new StringSerializer();
+ final boolean added = client.addIfAbsent("test", serializer);
+ waitABit();
+ final boolean added2 = client.addIfAbsent("test2", serializer);
+ waitABit();
+ final boolean added3 = client.addIfAbsent("test3", serializer);
+ waitABit();
+ assertTrue(added);
+ assertTrue(added2);
+ assertTrue(added3);
+
+ final boolean contains = client.contains("test", serializer);
+ final boolean contains2 = client.contains("test2", serializer);
+ assertTrue(contains);
+ assertTrue(contains2);
+
+ final boolean addedAgain = client.addIfAbsent("test", serializer);
+ assertFalse(addedAgain);
+
+ final boolean added4 = client.addIfAbsent("test4", serializer);
+ assertTrue(added4);
+
+ // ensure that added3 was evicted because it was used least frequently
+ assertFalse(client.contains("test3", serializer));
+ } finally {
+ client.close();
+ }
+ }
+
+ @Test
+ public void testPersistentSetServerAndClientWithFIFOEvictions() throws InitializationException, IOException {
+ runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
+ runner.setProperty(server, DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3");
+ runner.setProperty(server, DistributedSetCacheServer.EVICTION_POLICY, DistributedSetCacheServer.EVICTION_STRATEGY_FIFO);
+ runner.enableControllerService(server);
+
+ final DistributedSetCacheClientService client = createClient(server.getPort());
+ try {
+ final Serializer<String> serializer = new StringSerializer();
+
+ // add 3 entries to the cache. But, if we add too fast, we'll have the same millisecond
+ // for the entry time so we don't know which entry will be evicted. So we wait a few millis in between
+ final boolean added = client.addIfAbsent("test", serializer);
+ waitABit();
+ final boolean added2 = client.addIfAbsent("test2", serializer);
+ waitABit();
+ final boolean added3 = client.addIfAbsent("test3", serializer);
+ waitABit();
+
+ assertTrue(added);
+ assertTrue(added2);
+ assertTrue(added3);
+
+ final boolean contains = client.contains("test", serializer);
+ final boolean contains2 = client.contains("test2", serializer);
+ assertTrue(contains);
+ assertTrue(contains2);
+
+ final boolean addedAgain = client.addIfAbsent("test", serializer);
+ assertFalse(addedAgain);
+
+ final boolean added4 = client.addIfAbsent("test4", serializer);
+ assertTrue(added4);
+
+ // ensure that added3 was evicted because it was used least frequently
+ assertFalse(client.contains("test", serializer));
+ assertTrue(client.contains("test3", serializer));
+ } finally {
+ client.close();
+ }
+ }
+
+ @Test
+ public void testLimitServiceReadSize() throws InitializationException, IOException {
+ runner.enableControllerService(server);
+
+ final DistributedSetCacheClientService client = createClient(server.getPort());
+ try {
+ final Serializer<String> serializer = new StringSerializer();
+
+ final String value = "value";
+ final int maxReadSize = new MockPropertyValue(DistributedCacheServer.MAX_READ_SIZE.getDefaultValue()).asDataSize(DataUnit.B).intValue();
+ final int belowThreshold = maxReadSize / value.length();
+ final int aboveThreshold = belowThreshold + 1;
+ final String valueBelowThreshold = StringUtils.repeat(value, belowThreshold);
+ final String valueAboveThreshold = StringUtils.repeat(value, aboveThreshold);
+ assertFalse(client.contains(valueBelowThreshold, serializer));
+ assertThrows(IOException.class, () -> client.contains(valueAboveThreshold, serializer));
+ } finally {
+ client.close();
+ }
+ }
+
+ private void waitABit() {
+ try {
+ TimeUnit.MILLISECONDS.sleep(10);
+ } catch (final InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private DistributedSetCacheClientService createClient(final int port) throws InitializationException {
+ final DistributedSetCacheClientService client = new DistributedSetCacheClientService();
+ final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client");
+ client.initialize(clientInitContext);
+
+ final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
+ clientProperties.put(DistributedSetCacheClientService.HOSTNAME, "localhost");
+ clientProperties.put(DistributedSetCacheClientService.PORT, String.valueOf(port));
+ final MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup());
+ client.onEnabled(clientContext);
+
+ return client;
+ }
+
+ private static class StringSerializer implements Serializer<String> {
+
+ @Override
+ public void serialize(final String value, final OutputStream output) throws SerializationException, IOException {
+ output.write(value.getBytes(StandardCharsets.UTF_8));
+ }
+ }
+}