You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2017/03/13 19:08:41 UTC

nifi git commit: NIFI-3582: Fixing PersistentMapCache bug that skipped persistent evictions

Repository: nifi
Updated Branches:
  refs/heads/master 9d4239be1 -> bd9139010


NIFI-3582: Fixing PersistentMapCache bug that skipped persistent evictions

This closes #1592.

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/bd913901
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/bd913901
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/bd913901

Branch: refs/heads/master
Commit: bd9139010528b1ee2eb6351956e18cc592f1487d
Parents: 9d4239b
Author: Joe Gresock <jo...@lmco.com>
Authored: Mon Mar 13 16:01:37 2017 +0000
Committer: Bryan Bende <bb...@apache.org>
Committed: Mon Mar 13 15:08:25 2017 -0400

----------------------------------------------------------------------
 .../cache/server/map/PersistentMapCache.java    |  3 +-
 .../cache/server/TestServerAndClient.java       | 81 ++++++++++++++++++++
 2 files changed, 82 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/bd913901/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
index 62deae5..da457bd 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -80,7 +79,7 @@ public class PersistentMapCache implements MapCache {
                 records.add(new MapWaliRecord(UpdateType.DELETE, evicted.getKey(), evicted.getValue()));
             }
 
-            wali.update(Collections.singletonList(record), false);
+            wali.update(records, false);
 
             final long modCount = modifications.getAndIncrement();
             if ( modCount > 0 && modCount % 100000 == 0 ) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/bd913901/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
index c8f9478..0f5675c 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
@@ -236,6 +236,73 @@ public class TestServerAndClient {
     }
 
     @Test
+    public void testPersistentMapServerAndClientWithLFUEvictions() throws InitializationException, IOException {
+        /**
+         * This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug
+         * See:  https://issues.apache.org/jira/browse/NIFI-437
+         */
+        Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437",
+            SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
+
+        LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
+        // Create server
+        final File dataFile = new File("target/cache-data");
+        deleteRecursively(dataFile);
+
+        // Create server
+        final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
+        final DistributedMapCacheServer server = new MapServer();
+        runner.addControllerService("server", server);
+        runner.setProperty(server, DistributedMapCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
+        runner.setProperty(server, DistributedMapCacheServer.MAX_CACHE_ENTRIES, "3");
+        runner.setProperty(server, DistributedMapCacheServer.EVICTION_POLICY, DistributedMapCacheServer.EVICTION_STRATEGY_LFU);
+        runner.enableControllerService(server);
+
+        DistributedMapCacheClientService client = createMapClient(server.getPort());
+        final Serializer<String> serializer = new StringSerializer();
+        final boolean added = client.putIfAbsent("test", "1", serializer, serializer);
+        waitABit();
+        final boolean added2 = client.putIfAbsent("test2", "2", serializer, serializer);
+        waitABit();
+        final boolean added3 = client.putIfAbsent("test3", "3", serializer, serializer);
+        waitABit();
+        assertTrue(added);
+        assertTrue(added2);
+        assertTrue(added3);
+
+        final boolean contains = client.containsKey("test", serializer);
+        final boolean contains2 = client.containsKey("test2", serializer);
+        assertTrue(contains);
+        assertTrue(contains2);
+
+        final boolean addedAgain = client.putIfAbsent("test", "1", serializer, serializer);
+        assertFalse(addedAgain);
+
+        final boolean added4 = client.putIfAbsent("test4", "4", serializer, serializer);
+        assertTrue(added4);
+
+        // ensure that added3 was evicted because it was used least frequently
+        assertFalse(client.containsKey("test3", serializer));
+
+        server.shutdownServer();
+
+        final DistributedMapCacheServer newServer = new MapServer();
+        runner.addControllerService("server2", newServer);
+        runner.setProperty(newServer, DistributedMapCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
+        runner.enableControllerService(newServer);
+        client.close();
+        client = createMapClient(newServer.getPort());
+
+        assertTrue(client.containsKey("test", serializer));
+        assertTrue(client.containsKey("test2", serializer));
+        assertFalse(client.containsKey("test3", serializer));
+        assertTrue(client.containsKey("test4", serializer));
+
+        newServer.shutdownServer();
+        client.close();
+    }
+
+    @Test
     public void testPersistentSetServerAndClientWithFIFOEvictions() throws InitializationException, IOException {
         /**
          * This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug
@@ -628,6 +695,20 @@ public class TestServerAndClient {
         return client;
     }
 
+    private DistributedMapCacheClientService createMapClient(final int port) throws InitializationException {
+        final DistributedMapCacheClientService client = new DistributedMapCacheClientService();
+        final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client");
+        client.initialize(clientInitContext);
+
+        final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
+        clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost");
+        clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(port));
+        final MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup());
+        client.cacheConfig(clientContext);
+
+        return client;
+    }
+
     private static class StringSerializer implements Serializer<String> {
 
         @Override