You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2017/03/13 19:08:41 UTC
nifi git commit: NIFI-3582: Fixing PersistentMapCache bug that
skipped persistent evictions
Repository: nifi
Updated Branches:
refs/heads/master 9d4239be1 -> bd9139010
NIFI-3582: Fixing PersistentMapCache bug that skipped persistent evictions
This closes #1592.
Signed-off-by: Bryan Bende <bb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/bd913901
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/bd913901
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/bd913901
Branch: refs/heads/master
Commit: bd9139010528b1ee2eb6351956e18cc592f1487d
Parents: 9d4239b
Author: Joe Gresock <jo...@lmco.com>
Authored: Mon Mar 13 16:01:37 2017 +0000
Committer: Bryan Bende <bb...@apache.org>
Committed: Mon Mar 13 15:08:25 2017 -0400
----------------------------------------------------------------------
.../cache/server/map/PersistentMapCache.java | 3 +-
.../cache/server/TestServerAndClient.java | 81 ++++++++++++++++++++
2 files changed, 82 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/bd913901/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
index 62deae5..da457bd 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
@@ -23,7 +23,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -80,7 +79,7 @@ public class PersistentMapCache implements MapCache {
records.add(new MapWaliRecord(UpdateType.DELETE, evicted.getKey(), evicted.getValue()));
}
- wali.update(Collections.singletonList(record), false);
+ wali.update(records, false);
final long modCount = modifications.getAndIncrement();
if ( modCount > 0 && modCount % 100000 == 0 ) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/bd913901/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
index c8f9478..0f5675c 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
@@ -236,6 +236,73 @@ public class TestServerAndClient {
}
@Test
+ public void testPersistentMapServerAndClientWithLFUEvictions() throws InitializationException, IOException {
+ /**
+ * This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug
+ * See: https://issues.apache.org/jira/browse/NIFI-437
+ */
+ Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437",
+ SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
+
+ LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
+ // Create server
+ final File dataFile = new File("target/cache-data");
+ deleteRecursively(dataFile);
+
+ // Create server
+ final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
+ final DistributedMapCacheServer server = new MapServer();
+ runner.addControllerService("server", server);
+ runner.setProperty(server, DistributedMapCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
+ runner.setProperty(server, DistributedMapCacheServer.MAX_CACHE_ENTRIES, "3");
+ runner.setProperty(server, DistributedMapCacheServer.EVICTION_POLICY, DistributedMapCacheServer.EVICTION_STRATEGY_LFU);
+ runner.enableControllerService(server);
+
+ DistributedMapCacheClientService client = createMapClient(server.getPort());
+ final Serializer<String> serializer = new StringSerializer();
+ final boolean added = client.putIfAbsent("test", "1", serializer, serializer);
+ waitABit();
+ final boolean added2 = client.putIfAbsent("test2", "2", serializer, serializer);
+ waitABit();
+ final boolean added3 = client.putIfAbsent("test3", "3", serializer, serializer);
+ waitABit();
+ assertTrue(added);
+ assertTrue(added2);
+ assertTrue(added3);
+
+ final boolean contains = client.containsKey("test", serializer);
+ final boolean contains2 = client.containsKey("test2", serializer);
+ assertTrue(contains);
+ assertTrue(contains2);
+
+ final boolean addedAgain = client.putIfAbsent("test", "1", serializer, serializer);
+ assertFalse(addedAgain);
+
+ final boolean added4 = client.putIfAbsent("test4", "4", serializer, serializer);
+ assertTrue(added4);
+
+ // ensure that added3 was evicted because it was used least frequently
+ assertFalse(client.containsKey("test3", serializer));
+
+ server.shutdownServer();
+
+ final DistributedMapCacheServer newServer = new MapServer();
+ runner.addControllerService("server2", newServer);
+ runner.setProperty(newServer, DistributedMapCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
+ runner.enableControllerService(newServer);
+ client.close();
+ client = createMapClient(newServer.getPort());
+
+ assertTrue(client.containsKey("test", serializer));
+ assertTrue(client.containsKey("test2", serializer));
+ assertFalse(client.containsKey("test3", serializer));
+ assertTrue(client.containsKey("test4", serializer));
+
+ newServer.shutdownServer();
+ client.close();
+ }
+
+ @Test
public void testPersistentSetServerAndClientWithFIFOEvictions() throws InitializationException, IOException {
/**
* This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug
@@ -628,6 +695,20 @@ public class TestServerAndClient {
return client;
}
+ private DistributedMapCacheClientService createMapClient(final int port) throws InitializationException {
+ final DistributedMapCacheClientService client = new DistributedMapCacheClientService();
+ final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client");
+ client.initialize(clientInitContext);
+
+ final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
+ clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost");
+ clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(port));
+ final MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup());
+ client.cacheConfig(clientContext);
+
+ return client;
+ }
+
private static class StringSerializer implements Serializer<String> {
@Override