You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/16 20:05:32 UTC
[04/19] incubator-nifi git commit: NIFI-169 well it finally all
builds. There is a classpath issue still to sort out which impacts startup
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java
new file mode 100644
index 0000000..2c85cd8
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server;
+
+import java.io.IOException;
+
+public interface CacheServer {
+
+ void start() throws IOException;
+ void stop() throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
new file mode 100644
index 0000000..0f962d0
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.annotation.OnConfigured;
+import org.apache.nifi.processor.annotation.OnShutdown;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
+
+public abstract class DistributedCacheServer extends AbstractControllerService {
+ public static final String EVICTION_STRATEGY_LFU = "Least Frequently Used";
+ public static final String EVICTION_STRATEGY_LRU = "Least Recently Used";
+ public static final String EVICTION_STRATEGY_FIFO = "First In, First Out";
+
+ public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+ .name("Port")
+ .description("The port to listen on for incoming connections")
+ .required(true)
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+ .defaultValue("4557")
+ .build();
+ public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+ .name("SSL Context Service")
+ .description(
+ "If specified, this service will be used to create an SSL Context that will be used to secure communications; if not specified, communications will not be secure")
+ .required(false)
+ .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class))
+ .build();
+ public static final PropertyDescriptor MAX_CACHE_ENTRIES = new PropertyDescriptor.Builder()
+ .name("Maximum Cache Entries")
+ .description("The maximum number of cache entries that the cache can hold")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("10000")
+ .build();
+ public static final PropertyDescriptor EVICTION_POLICY = new PropertyDescriptor.Builder()
+ .name("Eviction Strategy")
+ .description("Determines which strategy should be used to evict values from the cache to make room for new entries")
+ .required(true)
+ .allowableValues(EVICTION_STRATEGY_LFU, EVICTION_STRATEGY_LRU, EVICTION_STRATEGY_FIFO)
+ .defaultValue(EVICTION_STRATEGY_LFU)
+ .build();
+ public static final PropertyDescriptor PERSISTENCE_PATH = new PropertyDescriptor.Builder()
+ .name("Persistence Directory")
+ .description("If specified, the cache will be persisted in the given directory; if not specified, the cache will be in-memory only")
+ .required(false)
+ .addValidator(StandardValidators.createDirectoryExistsValidator(true, true))
+ .build();
+
+ private volatile CacheServer cacheServer;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(PORT);
+ properties.add(MAX_CACHE_ENTRIES);
+ properties.add(EVICTION_POLICY);
+ properties.add(PERSISTENCE_PATH);
+ properties.add(new PropertyDescriptor.Builder().fromPropertyDescriptor(SSL_CONTEXT_SERVICE).allowableValues(
+ getControllerServiceLookup().getControllerServiceIdentifiers(SSLContextService.class)).build());
+ return properties;
+ }
+
+ @OnConfigured
+ public void startServer(final ConfigurationContext context) throws IOException {
+ if (cacheServer == null) {
+ cacheServer = createCacheServer(context);
+ cacheServer.start();
+ }
+ }
+
+ @OnShutdown
+ public void shutdownServer() throws IOException {
+ if (cacheServer != null) {
+ cacheServer.stop();
+ }
+ cacheServer = null;
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ shutdownServer();
+ }
+
+ protected abstract CacheServer createCacheServer(ConfigurationContext context);
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java
new file mode 100644
index 0000000..426573f
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server;
+
+import java.io.File;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.SSLContextService.ClientAuth;
+
+public class DistributedSetCacheServer extends DistributedCacheServer {
+
+ @Override
+ protected CacheServer createCacheServer(final ConfigurationContext context) {
+ final int port = context.getProperty(PORT).asInteger();
+ final String persistencePath = context.getProperty(PERSISTENCE_PATH).getValue();
+ final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ final int maxSize = context.getProperty(MAX_CACHE_ENTRIES).asInteger();
+ final String evictionPolicyName = context.getProperty(EVICTION_POLICY).getValue();
+
+ final SSLContext sslContext;
+ if ( sslContextService == null ) {
+ sslContext = null;
+ } else {
+ sslContext = sslContextService.createSSLContext(ClientAuth.REQUIRED);
+ }
+
+ final EvictionPolicy evictionPolicy;
+ switch (evictionPolicyName) {
+ case EVICTION_STRATEGY_FIFO:
+ evictionPolicy = EvictionPolicy.FIFO;
+ break;
+ case EVICTION_STRATEGY_LFU:
+ evictionPolicy = EvictionPolicy.LFU;
+ break;
+ case EVICTION_STRATEGY_LRU:
+ evictionPolicy = EvictionPolicy.LRU;
+ break;
+ default:
+ throw new IllegalArgumentException("Illegal Eviction Policy: " + evictionPolicyName);
+ }
+
+ try {
+ final File persistenceDir = persistencePath == null ? null : new File(persistencePath);
+
+ return new SetCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir);
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java
new file mode 100644
index 0000000..60bd2c1
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server;
+
+import java.util.Comparator;
+
+public enum EvictionPolicy {
+ LFU(new LFUComparator()),
+ LRU(new LRUComparator()),
+ FIFO(new FIFOComparator());
+
+ private final Comparator<CacheRecord> comparator;
+
+ private EvictionPolicy(final Comparator<CacheRecord> comparator) {
+ this.comparator = comparator;
+ }
+
+ public Comparator<CacheRecord> getComparator() {
+ return comparator;
+ }
+
+ public static class LFUComparator implements Comparator<CacheRecord> {
+ @Override
+ public int compare(final CacheRecord o1, final CacheRecord o2) {
+ if ( o1.equals(o2) ) {
+ return 0;
+ }
+
+ final int hitCountComparison = Integer.compare(o1.getHitCount(), o2.getHitCount());
+ final int entryDateComparison = (hitCountComparison == 0) ? Long.compare(o1.getEntryDate(), o2.getEntryDate()) : hitCountComparison;
+ return (entryDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : entryDateComparison);
+ }
+ }
+
+ public static class LRUComparator implements Comparator<CacheRecord> {
+ @Override
+ public int compare(final CacheRecord o1, final CacheRecord o2) {
+ if ( o1.equals(o2) ) {
+ return 0;
+ }
+
+ final int lastHitDateComparison = Long.compare(o1.getLastHitDate(), o2.getLastHitDate());
+ return (lastHitDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : lastHitDateComparison);
+ }
+ }
+
+ public static class FIFOComparator implements Comparator<CacheRecord> {
+ @Override
+ public int compare(final CacheRecord o1, final CacheRecord o2) {
+ if ( o1.equals(o2) ) {
+ return 0;
+ }
+
+ final int entryDateComparison = Long.compare(o1.getEntryDate(), o2.getEntryDate());
+ return (entryDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : entryDateComparison);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
new file mode 100644
index 0000000..5d2c0f6
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.distributed.cache.server.set.PersistentSetCache;
+import org.apache.nifi.distributed.cache.server.set.SetCache;
+import org.apache.nifi.distributed.cache.server.set.SetCacheResult;
+import org.apache.nifi.distributed.cache.server.set.SimpleSetCache;
+import org.apache.nifi.io.DataOutputStream;
+
+public class SetCacheServer extends AbstractCacheServer {
+
+ private final SetCache cache;
+
+ public SetCacheServer(final String identifier, final SSLContext sslContext, final int port, final int maxSize,
+ final EvictionPolicy evictionPolicy, final File persistencePath) throws IOException {
+ super(identifier, sslContext, port);
+
+ final SetCache simpleCache = new SimpleSetCache(identifier, maxSize, evictionPolicy);
+
+ if (persistencePath == null) {
+ this.cache = simpleCache;
+ } else {
+ final PersistentSetCache persistentCache = new PersistentSetCache(identifier, persistencePath, simpleCache);
+ persistentCache.restore();
+ this.cache = persistentCache;
+ }
+ }
+
+ @Override
+ protected boolean listen(final InputStream in, final OutputStream out, final int version) throws IOException {
+ final DataInputStream dis = new DataInputStream(in);
+ final DataOutputStream dos = new DataOutputStream(out);
+
+ final String action = dis.readUTF();
+ if (action.equals("close")) {
+ return false;
+ }
+
+ final int valueLength = dis.readInt();
+ final byte[] value = new byte[valueLength];
+ dis.readFully(value);
+ final ByteBuffer valueBuffer = ByteBuffer.wrap(value);
+
+ final SetCacheResult response;
+ switch (action) {
+ case "addIfAbsent":
+ response = cache.addIfAbsent(valueBuffer);
+ break;
+ case "contains":
+ response = cache.contains(valueBuffer);
+ break;
+ case "remove":
+ response = cache.remove(valueBuffer);
+ break;
+ default:
+ throw new IOException("IllegalRequest");
+ }
+
+ dos.writeBoolean(response.getResult());
+ dos.flush();
+
+ return true;
+ }
+
+ @Override
+ public void stop() throws IOException {
+ try {
+ super.stop();
+ } finally {
+ cache.shutdown();
+ }
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ if (!stopped)
+ stop();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
new file mode 100644
index 0000000..920529d
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.map;
+
+import java.io.File;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.distributed.cache.server.CacheServer;
+import org.apache.nifi.distributed.cache.server.DistributedCacheServer;
+import org.apache.nifi.distributed.cache.server.EvictionPolicy;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.SSLContextService.ClientAuth;
+
+public class DistributedMapCacheServer extends DistributedCacheServer {
+
+ @Override
+ protected CacheServer createCacheServer(final ConfigurationContext context) {
+ final int port = context.getProperty(PORT).asInteger();
+ final String persistencePath = context.getProperty(PERSISTENCE_PATH).getValue();
+ final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ final int maxSize = context.getProperty(MAX_CACHE_ENTRIES).asInteger();
+ final String evictionPolicyName = context.getProperty(EVICTION_POLICY).getValue();
+
+ final SSLContext sslContext;
+ if ( sslContextService == null ) {
+ sslContext = null;
+ } else {
+ sslContext = sslContextService.createSSLContext(ClientAuth.REQUIRED);
+ }
+
+ final EvictionPolicy evictionPolicy;
+ switch (evictionPolicyName) {
+ case EVICTION_STRATEGY_FIFO:
+ evictionPolicy = EvictionPolicy.FIFO;
+ break;
+ case EVICTION_STRATEGY_LFU:
+ evictionPolicy = EvictionPolicy.LFU;
+ break;
+ case EVICTION_STRATEGY_LRU:
+ evictionPolicy = EvictionPolicy.LRU;
+ break;
+ default:
+ throw new IllegalArgumentException("Illegal Eviction Policy: " + evictionPolicyName);
+ }
+
+ try {
+ final File persistenceDir = persistencePath == null ? null : new File(persistencePath);
+
+ return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir);
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
new file mode 100644
index 0000000..534cb0b
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.map;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public interface MapCache {
+
+ MapPutResult putIfAbsent(ByteBuffer key, ByteBuffer value) throws IOException;
+ boolean containsKey(ByteBuffer key) throws IOException;
+ ByteBuffer get(ByteBuffer key) throws IOException;
+ ByteBuffer remove(ByteBuffer key) throws IOException;
+ void shutdown() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java
new file mode 100644
index 0000000..b0ab0c4
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.map;
+
+import java.nio.ByteBuffer;
+
+import org.apache.nifi.distributed.cache.server.CacheRecord;
+
+public class MapCacheRecord extends CacheRecord {
+ private final ByteBuffer key;
+ private final ByteBuffer value;
+
+ public MapCacheRecord(final ByteBuffer key, final ByteBuffer value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public ByteBuffer getKey() {
+ return key;
+ }
+
+ public ByteBuffer getValue() {
+ return value;
+ }
+
+ @Override
+ public int hashCode() {
+ return 2938476 + key.hashCode() * value.hashCode();
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if ( obj == this ) {
+ return true;
+ }
+
+ if ( obj instanceof MapCacheRecord ) {
+ final MapCacheRecord that = ((MapCacheRecord) obj);
+ return key.equals(that.key) && value.equals(that.value);
+ }
+
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
new file mode 100644
index 0000000..3e8dd0e
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.map;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.distributed.cache.server.AbstractCacheServer;
+import org.apache.nifi.distributed.cache.server.EvictionPolicy;
+import org.apache.nifi.io.DataOutputStream;
+
+public class MapCacheServer extends AbstractCacheServer {
+
+ private final MapCache cache;
+
+ public MapCacheServer(final String identifier, final SSLContext sslContext, final int port, final int maxSize,
+ final EvictionPolicy evictionPolicy, final File persistencePath) throws IOException {
+ super(identifier, sslContext, port);
+
+ final MapCache simpleCache = new SimpleMapCache(identifier, maxSize, evictionPolicy);
+
+ if (persistencePath == null) {
+ this.cache = simpleCache;
+ } else {
+ final PersistentMapCache persistentCache = new PersistentMapCache(identifier, persistencePath, simpleCache);
+ persistentCache.restore();
+ this.cache = persistentCache;
+ }
+ }
+
+ @Override
+ protected boolean listen(final InputStream in, final OutputStream out, final int version) throws IOException {
+ final DataInputStream dis = new DataInputStream(in);
+ final DataOutputStream dos = new DataOutputStream(out);
+ final String action = dis.readUTF();
+ try {
+ switch (action) {
+ case "close": {
+ return false;
+ }
+ case "putIfAbsent": {
+ final byte[] key = readValue(dis);
+ final byte[] value = readValue(dis);
+ final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
+ dos.writeBoolean(putResult.isSuccessful());
+ break;
+ }
+ case "containsKey": {
+ final byte[] key = readValue(dis);
+ final boolean contains = cache.containsKey(ByteBuffer.wrap(key));
+ dos.writeBoolean(contains);
+ break;
+ }
+ case "getAndPutIfAbsent": {
+ final byte[] key = readValue(dis);
+ final byte[] value = readValue(dis);
+
+ final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
+ if (putResult.isSuccessful()) {
+ // Put was successful. There was no old value to get.
+ dos.writeInt(0);
+ } else {
+ // we didn't put. Write back the previous value
+ final byte[] byteArray = putResult.getExistingValue().array();
+ dos.writeInt(byteArray.length);
+ dos.write(byteArray);
+ }
+
+ break;
+ }
+ case "get": {
+ final byte[] key = readValue(dis);
+ final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key));
+ if (existingValue == null) {
+ // there was no existing value; we did a "put".
+ dos.writeInt(0);
+ } else {
+ // a value already existed. we did not update the map
+ final byte[] byteArray = existingValue.array();
+ dos.writeInt(byteArray.length);
+ dos.write(byteArray);
+ }
+
+ break;
+ }
+ case "remove": {
+ final byte[] key = readValue(dis);
+ final boolean removed = cache.remove(ByteBuffer.wrap(key)) != null;
+ dos.writeBoolean(removed);
+ break;
+ }
+ default: {
+ throw new IOException("Illegal Request");
+ }
+ }
+ } finally {
+ dos.flush();
+ }
+
+ return true;
+ }
+
+ @Override
+ public void stop() throws IOException {
+ try {
+ super.stop();
+ } finally {
+ cache.shutdown();
+ }
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ if (!stopped)
+ stop();
+ }
+
+ private byte[] readValue(final DataInputStream dis) throws IOException {
+ final int numBytes = dis.readInt();
+ final byte[] buffer = new byte[numBytes];
+ dis.readFully(buffer);
+ return buffer;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java
new file mode 100644
index 0000000..29695eb
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.map;
+
+import java.nio.ByteBuffer;
+
+public class MapPutResult {
+ private final boolean successful;
+ private final ByteBuffer key, value;
+ private final ByteBuffer existingValue;
+ private final ByteBuffer evictedKey, evictedValue;
+
+ public MapPutResult(final boolean successful, final ByteBuffer key, final ByteBuffer value, final ByteBuffer existingValue, final ByteBuffer evictedKey, final ByteBuffer evictedValue) {
+ this.successful = successful;
+ this.key = key;
+ this.value = value;
+ this.existingValue = existingValue;
+ this.evictedKey = evictedKey;
+ this.evictedValue = evictedValue;
+ }
+
+ public boolean isSuccessful() {
+ return successful;
+ }
+
+ public ByteBuffer getKey() {
+ return key;
+ }
+
+ public ByteBuffer getValue() {
+ return value;
+ }
+
+ public ByteBuffer getExistingValue() {
+ return existingValue;
+ }
+
+ public ByteBuffer getEvictedKey() {
+ return evictedKey;
+ }
+
+ public ByteBuffer getEvictedValue() {
+ return evictedValue;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
new file mode 100644
index 0000000..77fb77d
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.map;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.wali.MinimalLockingWriteAheadLog;
+import org.wali.SerDe;
+import org.wali.UpdateType;
+import org.wali.WriteAheadRepository;
+
+public class PersistentMapCache implements MapCache {
+
+ private final MapCache wrapped;
+ private final WriteAheadRepository<MapWaliRecord> wali;
+
+ private final AtomicLong modifications = new AtomicLong(0L);
+
+ public PersistentMapCache(final String serviceIdentifier, final File persistencePath, final MapCache cacheToWrap) throws IOException {
+ wali = new MinimalLockingWriteAheadLog<>(persistencePath.toPath(), 1, new Serde(), null);
+ wrapped = cacheToWrap;
+ }
+
+ synchronized void restore() throws IOException {
+ final Collection<MapWaliRecord> recovered = wali.recoverRecords();
+ for ( final MapWaliRecord record : recovered ) {
+ if ( record.getUpdateType() == UpdateType.CREATE ) {
+ wrapped.putIfAbsent(record.getKey(), record.getValue());
+ }
+ }
+ }
+
+ @Override
+ public MapPutResult putIfAbsent(final ByteBuffer key, final ByteBuffer value) throws IOException {
+ final MapPutResult putResult = wrapped.putIfAbsent(key, value);
+ if ( putResult.isSuccessful() ) {
+ // The put was successful.
+ final MapWaliRecord record = new MapWaliRecord(UpdateType.CREATE, key, value);
+ final List<MapWaliRecord> records = new ArrayList<>();
+ records.add(record);
+
+ if ( putResult.getEvictedKey() != null ) {
+ records.add(new MapWaliRecord(UpdateType.DELETE, putResult.getEvictedKey(), putResult.getEvictedValue()));
+ }
+
+ wali.update(Collections.singletonList(record), false);
+
+ final long modCount = modifications.getAndIncrement();
+ if ( modCount > 0 && modCount % 100000 == 0 ) {
+ wali.checkpoint();
+ }
+ }
+
+ return putResult;
+ }
+
+ @Override
+ public boolean containsKey(final ByteBuffer key) throws IOException {
+ return wrapped.containsKey(key);
+ }
+
+ @Override
+ public ByteBuffer get(final ByteBuffer key) throws IOException {
+ return wrapped.get(key);
+ }
+
+ @Override
+ public ByteBuffer remove(ByteBuffer key) throws IOException {
+ final ByteBuffer removeResult = wrapped.remove(key);
+ if ( removeResult != null ) {
+ final MapWaliRecord record = new MapWaliRecord(UpdateType.DELETE, key, removeResult);
+ final List<MapWaliRecord> records = new ArrayList<>(1);
+ records.add(record);
+ wali.update(records, false);
+
+ final long modCount = modifications.getAndIncrement();
+ if ( modCount > 0 && modCount % 1000 == 0 ) {
+ wali.checkpoint();
+ }
+ }
+ return removeResult;
+ }
+
+
+ @Override
+ public void shutdown() throws IOException {
+ wali.shutdown();
+ }
+
+
+ private static class MapWaliRecord {
+ private final UpdateType updateType;
+ private final ByteBuffer key;
+ private final ByteBuffer value;
+
+ public MapWaliRecord(final UpdateType updateType, final ByteBuffer key, final ByteBuffer value) {
+ this.updateType = updateType;
+ this.key = key;
+ this.value = value;
+ }
+
+ public UpdateType getUpdateType() {
+ return updateType;
+ }
+
+ public ByteBuffer getKey() {
+ return key;
+ }
+
+ public ByteBuffer getValue() {
+ return value;
+ }
+ }
+
+ private static class Serde implements SerDe<MapWaliRecord> {
+
+ @Override
+ public void serializeEdit(MapWaliRecord previousRecordState, MapWaliRecord newRecordState, java.io.DataOutputStream out) throws IOException {
+ final UpdateType updateType = newRecordState.getUpdateType();
+ if ( updateType == UpdateType.DELETE ) {
+ out.write(0);
+ } else {
+ out.write(1);
+ }
+
+ final byte[] key = newRecordState.getKey().array();
+ final byte[] value = newRecordState.getValue().array();
+
+ out.writeInt(key.length);
+ out.write(key);
+ out.writeInt(value.length);
+ out.write(value);
+ }
+
+ @Override
+ public void serializeRecord(MapWaliRecord record, java.io.DataOutputStream out) throws IOException {
+ serializeEdit(null, record, out);
+ }
+
+ @Override
+ public MapWaliRecord deserializeEdit(final DataInputStream in, final Map<Object, MapWaliRecord> currentRecordStates, final int version) throws IOException {
+ final int updateTypeValue = in.read();
+ if ( updateTypeValue < 0 ) {
+ throw new EOFException();
+ }
+
+ final UpdateType updateType = (updateTypeValue == 0 ? UpdateType.DELETE : UpdateType.CREATE);
+
+ final int keySize = in.readInt();
+ final byte[] key = new byte[keySize];
+ in.readFully(key);
+
+ final int valueSize = in.readInt();
+ final byte[] value = new byte[valueSize];
+ in.readFully(value);
+
+ return new MapWaliRecord(updateType, ByteBuffer.wrap(key), ByteBuffer.wrap(value));
+ }
+
+ @Override
+ public MapWaliRecord deserializeRecord(DataInputStream in, int version) throws IOException {
+ return deserializeEdit(in, new HashMap<Object, MapWaliRecord>(), version);
+ }
+
+ @Override
+ public Object getRecordIdentifier(final MapWaliRecord record) {
+ return record.getKey();
+ }
+
+ @Override
+ public UpdateType getUpdateType(final MapWaliRecord record) {
+ return record.getUpdateType();
+ }
+
+ @Override
+ public String getLocation(final MapWaliRecord record) {
+ return null;
+ }
+
+ @Override
+ public int getVersion() {
+ return 1;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
new file mode 100644
index 0000000..10139f1
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.map;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.nifi.distributed.cache.server.EvictionPolicy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SimpleMapCache implements MapCache {
+ private static final Logger logger = LoggerFactory.getLogger(SimpleMapCache.class);
+
+ private final Map<ByteBuffer, MapCacheRecord> cache = new HashMap<>();
+ private final SortedMap<MapCacheRecord, ByteBuffer> inverseCacheMap;
+
+ private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+ private final Lock readLock = rwLock.readLock();
+ private final Lock writeLock = rwLock.writeLock();
+
+ private final String serviceIdentifier;
+
+ private final int maxSize;
+
+ public SimpleMapCache(final String serviceIdentifier, final int maxSize, final EvictionPolicy evictionPolicy) {
+ // need to change to ConcurrentMap as this is modified when only the readLock is held
+ inverseCacheMap = new ConcurrentSkipListMap<>(evictionPolicy.getComparator());
+ this.serviceIdentifier = serviceIdentifier;
+ this.maxSize = maxSize;
+ }
+
+ @Override
+ public String toString() {
+ return "SimpleSetCache[service id=" + serviceIdentifier + "]";
+ }
+
+ // don't need synchronized because this method is only called when the writeLock is held, and all
+ // public methods obtain either the read or write lock
+ private MapCacheRecord evict() {
+ if ( cache.size() < maxSize ) {
+ return null;
+ }
+
+ final MapCacheRecord recordToEvict = inverseCacheMap.firstKey();
+ final ByteBuffer valueToEvict = inverseCacheMap.remove(recordToEvict);
+ cache.remove(valueToEvict);
+
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Evicting value {} from cache", new String(valueToEvict.array(), StandardCharsets.UTF_8));
+ }
+
+ return recordToEvict;
+ }
+
+ @Override
+ public MapPutResult putIfAbsent(final ByteBuffer key, final ByteBuffer value) {
+ writeLock.lock();
+ try {
+ final MapCacheRecord record = cache.get(key);
+ if ( record == null ) {
+ // Record is null. We will add.
+ final MapCacheRecord evicted = evict();
+ final MapCacheRecord newRecord = new MapCacheRecord(key, value);
+ cache.put(key, newRecord);
+ inverseCacheMap.put(newRecord, key);
+
+ if ( evicted == null ) {
+ return new MapPutResult(true, key, value, null, null, null);
+ } else {
+ return new MapPutResult(true, key, value, null, evicted.getKey(), evicted.getValue());
+ }
+ }
+
+ // Record is not null. Increment hit count and return result indicating that record was not added.
+ inverseCacheMap.remove(record);
+ record.hit();
+ inverseCacheMap.put(record, key);
+
+ return new MapPutResult(false, key, value, record.getValue(), null, null);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean containsKey(final ByteBuffer key) {
+ readLock.lock();
+ try {
+ final MapCacheRecord record = cache.get(key);
+ if ( record == null ) {
+ return false;
+ }
+
+ inverseCacheMap.remove(record);
+ record.hit();
+ inverseCacheMap.put(record, key);
+
+ return true;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public ByteBuffer get(final ByteBuffer key) {
+ readLock.lock();
+ try {
+ final MapCacheRecord record = cache.get(key);
+ if ( record == null ) {
+ return null;
+ }
+
+ inverseCacheMap.remove(record);
+ record.hit();
+ inverseCacheMap.put(record, key);
+
+ return record.getValue();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public ByteBuffer remove(ByteBuffer key) throws IOException {
+ writeLock.lock();
+ try {
+ final MapCacheRecord record = cache.remove(key);
+ if (record == null) {
+ return null;
+ }
+ inverseCacheMap.remove(record);
+ return record.getValue();
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public void shutdown() throws IOException {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java
new file mode 100644
index 0000000..4d75fc0
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.set;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.wali.MinimalLockingWriteAheadLog;
+import org.wali.SerDe;
+import org.wali.UpdateType;
+import org.wali.WriteAheadRepository;
+
+public class PersistentSetCache implements SetCache {
+
+ private final SetCache wrapped;
+ private final WriteAheadRepository<SetRecord> wali;
+
+ private final AtomicLong modifications = new AtomicLong(0L);
+
+ public PersistentSetCache(final String serviceIdentifier, final File persistencePath, final SetCache cacheToWrap) throws IOException {
+ wali = new MinimalLockingWriteAheadLog<>(persistencePath.toPath(), 1, new Serde(), null);
+ wrapped = cacheToWrap;
+ }
+
+ public synchronized void restore() throws IOException {
+ final Collection<SetRecord> recovered = wali.recoverRecords();
+ for ( final SetRecord record : recovered ) {
+ if ( record.getUpdateType() == UpdateType.CREATE ) {
+ addIfAbsent(record.getBuffer());
+ }
+ }
+ }
+
+ @Override
+ public synchronized SetCacheResult remove(final ByteBuffer value) throws IOException {
+ final SetCacheResult removeResult = wrapped.remove(value);
+ if ( removeResult.getResult() ) {
+ final SetRecord record = new SetRecord(UpdateType.DELETE, value);
+ final List<SetRecord> records = new ArrayList<>();
+ records.add(record);
+ wali.update(records, false);
+
+ final long modCount = modifications.getAndIncrement();
+ if ( modCount > 0 && modCount % 1000 == 0 ) {
+ wali.checkpoint();
+ }
+ }
+
+ return removeResult;
+ }
+
+ @Override
+ public synchronized SetCacheResult addIfAbsent(final ByteBuffer value) throws IOException {
+ final SetCacheResult addResult = wrapped.addIfAbsent(value);
+ if ( addResult.getResult() ) {
+ final SetRecord record = new SetRecord(UpdateType.CREATE, value);
+ final List<SetRecord> records = new ArrayList<>();
+ records.add(record);
+
+ final SetCacheRecord evictedRecord = addResult.getEvictedRecord();
+ if ( evictedRecord != null ) {
+ records.add(new SetRecord(UpdateType.DELETE, evictedRecord.getValue()));
+ }
+
+ wali.update(records, false);
+
+ final long modCount = modifications.getAndIncrement();
+ if ( modCount > 0 && modCount % 1000 == 0 ) {
+ wali.checkpoint();
+ }
+ }
+
+ return addResult;
+ }
+
+ @Override
+ public synchronized SetCacheResult contains(final ByteBuffer value) throws IOException {
+ return wrapped.contains(value);
+ }
+
+ @Override
+ public void shutdown() throws IOException {
+ wali.shutdown();
+ }
+
+ private static class SetRecord {
+ private final UpdateType updateType;
+ private final ByteBuffer value;
+
+ public SetRecord(final UpdateType updateType, final ByteBuffer value) {
+ this.updateType = updateType;
+ this.value = value;
+ }
+
+ public UpdateType getUpdateType() {
+ return updateType;
+ }
+
+ public ByteBuffer getBuffer() {
+ return value;
+ }
+
+ public byte[] getData() {
+ return value.array();
+ }
+ }
+
+ private static class Serde implements SerDe<SetRecord> {
+
+ @Override
+ public void serializeEdit(final SetRecord previousRecordState, final SetRecord newRecordState, final DataOutputStream out) throws IOException {
+ final UpdateType updateType = newRecordState.getUpdateType();
+ if ( updateType == UpdateType.DELETE ) {
+ out.write(0);
+ } else {
+ out.write(1);
+ }
+
+ final byte[] data = newRecordState.getData();
+ out.writeInt(data.length);
+ out.write(newRecordState.getData());
+ }
+
+ @Override
+ public void serializeRecord(SetRecord record, DataOutputStream out) throws IOException {
+ serializeEdit(null, record, out);
+ }
+
+ @Override
+ public SetRecord deserializeEdit(final DataInputStream in, final Map<Object, SetRecord> currentRecordStates, final int version) throws IOException {
+ final int value = in.read();
+ if ( value < 0 ) {
+ throw new EOFException();
+ }
+
+ final UpdateType updateType = (value == 0 ? UpdateType.DELETE : UpdateType.CREATE);
+
+ final int size = in.readInt();
+ final byte[] data = new byte[size];
+ in.readFully(data);
+
+ return new SetRecord(updateType, ByteBuffer.wrap(data));
+ }
+
+ @Override
+ public SetRecord deserializeRecord(DataInputStream in, int version) throws IOException {
+ return deserializeEdit(in, new HashMap<Object, SetRecord>(), version);
+ }
+
+ @Override
+ public Object getRecordIdentifier(final SetRecord record) {
+ return record.getBuffer();
+ }
+
+ @Override
+ public UpdateType getUpdateType(final SetRecord record) {
+ return record.getUpdateType();
+ }
+
+ @Override
+ public String getLocation(final SetRecord record) {
+ return null;
+ }
+
+ @Override
+ public int getVersion() {
+ return 1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java
new file mode 100644
index 0000000..bf6ae3e
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.set;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public interface SetCache {
+
+ SetCacheResult remove(ByteBuffer value) throws IOException;
+ SetCacheResult addIfAbsent(ByteBuffer value) throws IOException;
+ SetCacheResult contains(ByteBuffer value) throws IOException;
+ void shutdown() throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java
new file mode 100644
index 0000000..20b6fae
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.set;
+
+import java.nio.ByteBuffer;
+
+import org.apache.nifi.distributed.cache.server.CacheRecord;
+
+public class SetCacheRecord extends CacheRecord {
+ private final ByteBuffer value;
+
+ public SetCacheRecord(final ByteBuffer value) {
+ this.value = value;
+ }
+
+ public ByteBuffer getValue() {
+ return value;
+ }
+
+ @Override
+ public int hashCode() {
+ return value.hashCode();
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if ( this == obj ) {
+ return true;
+ }
+
+ if (obj instanceof SetCacheRecord) {
+ return value.equals(((SetCacheRecord) obj).value);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "SetCacheRecord[value=" + new String(value.array()) + ", hitCount=" + getHitCount() + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java
new file mode 100644
index 0000000..732c4f0
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.set;
+
+
+
+public class SetCacheResult {
+ private final boolean result;
+ private final SetCacheRecord stats;
+ private final SetCacheRecord evictedRecord;
+
+ public SetCacheResult(final boolean result, final SetCacheRecord stats, final SetCacheRecord evictedRecord) {
+ this.result = result;
+ this.stats = stats;
+ this.evictedRecord = evictedRecord;
+ }
+
+ public boolean getResult() {
+ return result;
+ }
+
+ public SetCacheRecord getRecord() {
+ return stats;
+ }
+
+ public SetCacheRecord getEvictedRecord() {
+ return evictedRecord;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SimpleSetCache.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SimpleSetCache.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SimpleSetCache.java
new file mode 100644
index 0000000..77d6481
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SimpleSetCache.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.set;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.nifi.distributed.cache.server.EvictionPolicy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SimpleSetCache implements SetCache {
+ private static final Logger logger = LoggerFactory.getLogger(SimpleSetCache.class);
+
+ private final Map<ByteBuffer, SetCacheRecord> cache = new HashMap<>();
+ private final SortedMap<SetCacheRecord, ByteBuffer> inverseCacheMap;
+
+ private final String serviceIdentifier;
+
+ private final int maxSize;
+
+ public SimpleSetCache(final String serviceIdentifier, final int maxSize, final EvictionPolicy evictionPolicy) {
+ inverseCacheMap = new TreeMap<>(evictionPolicy.getComparator());
+ this.serviceIdentifier = serviceIdentifier;
+ this.maxSize = maxSize;
+ }
+
+ private synchronized SetCacheRecord evict() {
+ if ( cache.size() < maxSize ) {
+ return null;
+ }
+
+ final SetCacheRecord recordToEvict = inverseCacheMap.firstKey();
+ final ByteBuffer valueToEvict = inverseCacheMap.remove(recordToEvict);
+ cache.remove(valueToEvict);
+
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Evicting value {} from cache", new String(valueToEvict.array(), StandardCharsets.UTF_8));
+ }
+
+ return recordToEvict;
+ }
+
+ @Override
+ public synchronized SetCacheResult addIfAbsent(final ByteBuffer value) {
+ final SetCacheRecord record = cache.get(value);
+ if ( record == null ) {
+ final SetCacheRecord evicted = evict();
+ final SetCacheRecord newRecord = new SetCacheRecord(value);
+ cache.put(value, newRecord);
+ inverseCacheMap.put(newRecord, value);
+ return new SetCacheResult(true, newRecord, evicted);
+ } else {
+ // We have to remove the record and add it again in order to cause the Map to stay sorted
+ inverseCacheMap.remove(record);
+ record.hit();
+ inverseCacheMap.put(record, value);
+
+ return new SetCacheResult(false, record, null);
+ }
+ }
+
+ @Override
+ public synchronized SetCacheResult contains(final ByteBuffer value) {
+ final SetCacheRecord record = cache.get(value);
+ if ( record == null ) {
+ return new SetCacheResult(false, null, null);
+ } else {
+ // We have to remove the record and add it again in order to cause the Map to stay sorted
+ inverseCacheMap.remove(record);
+ record.hit();
+ inverseCacheMap.put(record, value);
+
+ return new SetCacheResult(true, record, null);
+ }
+ }
+
+ @Override
+ public synchronized SetCacheResult remove(final ByteBuffer value) {
+ final SetCacheRecord record = cache.remove(value);
+ if ( record == null ) {
+ return new SetCacheResult(false, null, null);
+ } else {
+ inverseCacheMap.remove(record);
+ return new SetCacheResult(true, record, null);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "SimpleSetCache[service id=" + serviceIdentifier + "]";
+ }
+
+ @Override
+ public void shutdown() throws IOException {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000..0509c7c
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.distributed.cache.server.DistributedSetCacheServer
+org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/resources/docs/org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer/index.html
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/resources/docs/org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer/index.html b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/resources/docs/org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer/index.html
new file mode 100644
index 0000000..dca3aa1
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/resources/docs/org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer/index.html
@@ -0,0 +1,82 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<head>
+<meta charset="utf-8" />
+<title>Distributed Map Cache Client Service</title>
+<link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+ <h2>Description:</h2>
+
+ <p>A Controller Service that starts an embedded server and listens for connections from clients. The
+ server provides the ability to query the cache, add data to the cache, and remove data from the cache.</p>
+
+
+
+ <p>
+ <strong>Properties:</strong>
+ </p>
+ <p>In the list below, the names of required properties appear
+ in bold. Any other properties (not in bold) are considered optional.
+ If a property has a default value, it is indicated. If a property
+ supports the use of the NiFi Expression Language (or simply,
+ "expression language"), that is also indicated.</p>
+
+ <ul>
+ <li><strong>Port</strong>
+ <ul>
+ <li>The port to listen on for incoming connections</li>
+ <li>Default value: 4557</li>
+ <li>Supports expression language: false</li>
+ </ul></li>
+ <li>SSL Context Service
+ <ul>
+ <li>If specified, this service will be used to create an SSL Context that will be used to secure communications; if not specified, communications will not be secure</li>
+ <li>Default value: no default</li>
+ <li>Supports expression language: false</li>
+ </ul></li>
+ <li><strong>Maximum Cache Entries</strong>
+ <ul>
+ <li>The maximum number of cache entries that the cache can hold
+ <li>Default value: 10,000</li>
+ <li>Supports expression language: false</li>
+ </ul></li>
+ <li><strong>Eviction Strategy</strong>
+ <ul>
+ <li>Determines which strategy should be used to evict values from the cache to make room for new entries. Valid values:
+ <code>Least Frequently Used</code>, <code>Least Recently Used</code>, and <code>First In, First Out</code>
+ <li>Default value: Least Frequently Used</li>
+ <li>Supports expression language: false</li>
+ </ul></li>
+ <li>Persistence Directory
+ <ul>
+ <li>If specified, the cache will be persisted in the given directory; if not specified, the cache will be in-memory only</li>
+ <li>Default value: no default (in-memory)</li>
+ <li>Supports expression language: true - JVM and System Properties Only</li>
+ </ul></li>
+ </ul>
+
+
+ <i>See Also:</i>
+ <ul>
+ <li><a href="../org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html">Distributed Map Cache Client Service</a></li>
+ <li><a href="../org.apache.nifi.ssl.StandardSSLContextService/index.html">Standard SSL Context Service</a></li>
+ </ul>
+
+</body>
+</html>