You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/08 21:29:57 UTC

[34/51] [partial] incubator-nifi git commit: Initial code contribution

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
new file mode 100644
index 0000000..1d7c94c
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.annotation.OnConfigured;
+import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
+import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
+import org.apache.nifi.io.ByteArrayOutputStream;
+import org.apache.nifi.io.DataOutputStream;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.SSLContextService.ClientAuth;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DistributedSetCacheClientService extends AbstractControllerService implements DistributedSetCacheClient {
+
+    private static final Logger logger = LoggerFactory.getLogger(DistributedMapCacheClientService.class);
+
+    public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+            .name("Server Hostname")
+            .description("The name of the server that is running the DistributedSetCacheServer service")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+            .name("Server Port")
+            .description("The port on the remote server that is to be used when communicating with the DistributedSetCacheServer service")
+            .required(true)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .defaultValue("4557")
+            .build();
+    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+            .name("SSL Context Service")
+            .description(
+                    "If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted")
+            .required(false)
+            .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class))
+            .defaultValue(null)
+            .build();
+    public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("Communications Timeout")
+            .description(
+                    "Specifices how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("30 secs")
+            .build();
+
+    private final BlockingQueue<CommsSession> queue = new LinkedBlockingQueue<>();
+    private volatile ConfigurationContext configContext;
+    private volatile boolean closed = false;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(HOSTNAME);
+        descriptors.add(PORT);
+        descriptors.add(SSL_CONTEXT_SERVICE);
+        descriptors.add(COMMUNICATIONS_TIMEOUT);
+        return descriptors;
+    }
+
+    @OnConfigured
+    public void onConfigured(final ConfigurationContext context) {
+        this.configContext = context;
+    }
+
+    public CommsSession createCommsSession(final ConfigurationContext context) throws IOException {
+        final String hostname = context.getProperty(HOSTNAME).getValue();
+        final int port = context.getProperty(PORT).asInteger();
+        final long timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+        final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+
+        final CommsSession commsSession;
+        if (sslContextService == null) {
+            commsSession = new StandardCommsSession(hostname, port);
+        } else {
+            commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port);
+        }
+
+        commsSession.setTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
+        return commsSession;
+    }
+
+    private CommsSession leaseCommsSession() throws IOException {
+        CommsSession session = queue.poll();
+        if (session != null && !session.isClosed()) {
+            return session;
+        }
+
+        session = createCommsSession(configContext);
+        final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
+        try {
+            ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator);
+        } catch (final HandshakeException e) {
+            try {
+                session.close();
+            } catch (final IOException ioe) {
+            }
+
+            throw new IOException(e);
+        }
+
+        return session;
+    }
+
+    @Override
+    public <T> boolean addIfAbsent(final T value, final Serializer<T> serializer) throws IOException {
+        return invokeRemoteBoolean("addIfAbsent", value, serializer);
+    }
+
+    @Override
+    public <T> boolean contains(final T value, final Serializer<T> serializer) throws IOException {
+        return invokeRemoteBoolean("contains", value, serializer);
+    }
+
+    @Override
+    public <T> boolean remove(final T value, final Serializer<T> serializer) throws IOException {
+        return invokeRemoteBoolean("remove", value, serializer);
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.closed = true;
+
+        CommsSession commsSession;
+        while ((commsSession = queue.poll()) != null) {
+            try (final DataOutputStream dos = new DataOutputStream(commsSession.getOutputStream())) {
+                dos.writeUTF("close");
+                dos.flush();
+                commsSession.close();
+            } catch (final IOException e) {
+            }
+        }
+        logger.info("Closed {}", new Object[] { getIdentifier() });
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        if (!closed)
+            close();
+        logger.debug("Finalize called");
+    }
+
+    private <T> boolean invokeRemoteBoolean(final String methodName, final T value, final Serializer<T> serializer) throws IOException {
+        if (closed) {
+            throw new IllegalStateException("Client is closed");
+        }
+
+        final CommsSession session = leaseCommsSession();
+        try {
+            final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+            dos.writeUTF(methodName);
+
+            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            serializer.serialize(value, baos);
+            dos.writeInt(baos.size());
+            baos.writeTo(dos);
+            dos.flush();
+
+            final DataInputStream dis = new DataInputStream(session.getInputStream());
+            return dis.readBoolean();
+        } catch (final IOException ioe) {
+            try {
+                session.close();
+            } catch (final IOException ignored) {
+            }
+
+            throw ioe;
+        } finally {
+            if (!session.isClosed()) {
+                if (this.closed) {
+                    try {
+                        session.close();
+                    } catch (final IOException ioe) {
+                    }
+                } else {
+                    queue.offer(session);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
new file mode 100644
index 0000000..c8be082
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.io.BufferedInputStream;
+import org.apache.nifi.io.BufferedOutputStream;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream;
+
+public class SSLCommsSession implements CommsSession {
+    private final SSLSocketChannel sslSocketChannel;
+    private final SSLContext sslContext;
+    private final String hostname;
+    private final int port;
+    
+    private final SSLSocketChannelInputStream in;
+    private final BufferedInputStream bufferedIn;
+    
+    private final SSLSocketChannelOutputStream out;
+    private final BufferedOutputStream bufferedOut;
+
+    public SSLCommsSession(final SSLContext sslContext, final String hostname, final int port) throws IOException { 
+        sslSocketChannel = new SSLSocketChannel(sslContext, hostname, port, true);
+        
+        in = new SSLSocketChannelInputStream(sslSocketChannel);
+        bufferedIn = new BufferedInputStream(in);
+        
+        out = new SSLSocketChannelOutputStream(sslSocketChannel);
+        bufferedOut = new BufferedOutputStream(out);
+        
+        this.sslContext = sslContext;
+        this.hostname = hostname;
+        this.port = port;
+    }
+    
+    @Override
+    public void interrupt() {
+        sslSocketChannel.interrupt();
+    }
+    
+    @Override
+    public void close() throws IOException {
+        sslSocketChannel.close();
+    }
+
+    @Override
+    public void setTimeout(final long value, final TimeUnit timeUnit) {
+        sslSocketChannel.setTimeout((int) TimeUnit.MILLISECONDS.convert(value, timeUnit));
+    }
+
+    @Override
+    public InputStream getInputStream() throws IOException {
+        return bufferedIn;
+    }
+
+    @Override
+    public OutputStream getOutputStream() throws IOException {
+        return bufferedOut;
+    }
+
+    @Override
+    public boolean isClosed() {
+        return sslSocketChannel.isClosed();
+    }
+    
+    @Override
+    public String getHostname() {
+        return hostname;
+    }
+    
+    @Override
+    public int getPort() {
+        return port;
+    }
+    @Override
+    public SSLContext getSSLContext() {
+        return sslContext;
+    }
+    @Override
+    public long getTimeout(final TimeUnit timeUnit) {
+        return timeUnit.convert(sslSocketChannel.getTimeout(), TimeUnit.MILLISECONDS);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
new file mode 100644
index 0000000..bbe2917
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.io.BufferedInputStream;
+import org.apache.nifi.io.BufferedOutputStream;
+import org.apache.nifi.remote.io.InterruptableInputStream;
+import org.apache.nifi.remote.io.InterruptableOutputStream;
+import org.apache.nifi.remote.io.socket.SocketChannelInputStream;
+import org.apache.nifi.remote.io.socket.SocketChannelOutputStream;
+
+public class StandardCommsSession implements CommsSession {
+    private final SocketChannel socketChannel;
+    private final String hostname;
+    private final int port;
+    private volatile long timeoutMillis;
+
+    private final SocketChannelInputStream in;
+    private final InterruptableInputStream bufferedIn;
+
+    private final SocketChannelOutputStream out;
+    private final InterruptableOutputStream bufferedOut;
+
+    public StandardCommsSession(final String hostname, final int port) throws IOException {
+        socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port));
+        socketChannel.configureBlocking(false);
+        in = new SocketChannelInputStream(socketChannel);
+        bufferedIn = new InterruptableInputStream(new BufferedInputStream(in));
+
+        out = new SocketChannelOutputStream(socketChannel);
+        bufferedOut = new InterruptableOutputStream(new BufferedOutputStream(out));
+
+        this.hostname = hostname;
+        this.port = port;
+    }
+
+    @Override
+    public void interrupt() {
+        bufferedIn.interrupt();
+        bufferedOut.interrupt();
+    }
+
+    @Override
+    public void close() throws IOException {
+        socketChannel.close();
+    }
+
+    @Override
+    public void setTimeout(final long value, final TimeUnit timeUnit) {
+        in.setTimeout((int) TimeUnit.MILLISECONDS.convert(value, timeUnit));
+        out.setTimeout((int) TimeUnit.MILLISECONDS.convert(value, timeUnit));
+        timeoutMillis = TimeUnit.MILLISECONDS.convert(value, timeUnit);
+    }
+
+    @Override
+    public InputStream getInputStream() throws IOException {
+        return bufferedIn;
+    }
+
+    @Override
+    public OutputStream getOutputStream() throws IOException {
+        return bufferedOut;
+    }
+
+    @Override
+    public boolean isClosed() {
+        boolean closed = !socketChannel.isConnected();
+        if (!closed) {
+            try {
+                this.in.isDataAvailable();
+            } catch (IOException e) {
+                try {
+                    close();
+                } catch (IOException e1) {
+                }
+                closed = true;
+            }
+        }
+        return closed;
+    }
+
+    @Override
+    public String getHostname() {
+        return hostname;
+    }
+
+    @Override
+    public int getPort() {
+        return port;
+    }
+
+    @Override
+    public SSLContext getSSLContext() {
+        return null;
+    }
+
+    @Override
+    public long getTimeout(final TimeUnit timeUnit) {
+        return timeUnit.convert(timeoutMillis, TimeUnit.MILLISECONDS);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000..a91f7ee
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService
+org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html
new file mode 100644
index 0000000..d5f3595
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html
@@ -0,0 +1,78 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+<meta charset="utf-8" />
+<title>Distributed Map Cache Client Service</title>
+<link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+	<h2>Description:</h2>
+
+	<p>A Controller Service that can be used to communicate with a
+		Distributed Map Cache Server.</p>
+
+
+
+	<p>
+		<strong>Properties:</strong>
+	</p>
+	<p>In the list below, the names of required properties appear
+		in bold. Any other properties (not in bold) are considered optional.
+		If a property has a default value, it is indicated. If a property
+		supports the use of the NiFi Expression Language (or simply,
+		"expression language"), that is also indicated.</p>
+
+	<ul>
+		<li><strong>Server Hostname</strong>
+			<ul>
+				<li>The name of the server that is running the DistributedMapCacheServer service</li>
+				<li>Default value: no default</li>
+				<li>Supports expression language: false</li>
+			</ul></li>
+		<li><strong>Server Port</strong>
+			<ul>
+				<li>The port on the remote server that is to be used when communicating with the 
+				<a href="../nifi.distributed.cache.server.map.DistributedMapCacheServer/index.html">DistributedMapCacheServer</a> service</li>
+				
+				<li>Default value: 4557</li>
+				<li>Supports expression language: false</li>
+			</ul></li>
+		<li>SSL Context Service
+			<ul>
+				<li>If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted
+				<li>Default value: no default</li>
+				<li>Supports expression language: false</li>
+			</ul></li>
+		<li><strong>Communications Timeout</strong>
+			<ul>
+				<li>Specifices how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received
+				<li>Default value: 30 secs</li>
+				<li>Supports expression language: false</li>
+			</ul></li>
+			
+	</ul>
+
+
+	<i>See Also:</i>
+	<ul>
+		<li><a href="../org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer/index.html">Distributed Map Cache Server</a></li>
+		<li><a href="../org.apache.nifi.ssl.StandardSSLContextService/index.html">Standard SSL Context Service</a></li>
+	</ul>
+
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService/index.html
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService/index.html b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService/index.html
new file mode 100755
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml b/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml
new file mode 100644
index 0000000..bc612ae
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml
@@ -0,0 +1,39 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.apache.nifi</groupId>
+		<artifactId>distributed-cache-services-bundle</artifactId>
+		<version>0.0.1-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>distributed-cache-protocol</artifactId>
+	<name>Distributed Cache Protocol</name>
+
+	<description>
+	  	Defines the communications protocol that is used between clients and servers 
+	  	for the Distributed Cache services
+	</description>
+
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>remote-communications-utils</artifactId>
+		</dependency>
+	</dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java
new file mode 100644
index 0000000..da2acad
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.protocol;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
+import org.apache.nifi.remote.VersionNegotiator;
+
+public class ProtocolHandshake {
+
+    public static final byte[] MAGIC_HEADER = new byte[] { 'N', 'i', 'F', 'i' };
+    
+    public static final int RESOURCE_OK = 20;
+    public static final int DIFFERENT_RESOURCE_VERSION = 21;
+    public static final int ABORT = 255;
+
+    
+    public static void initiateHandshake(final InputStream in, final OutputStream out, final VersionNegotiator versionNegotiator) throws IOException, HandshakeException {
+        final DataInputStream dis = new DataInputStream(in);
+        final DataOutputStream dos = new DataOutputStream(out);
+        
+        try {
+            dos.write(MAGIC_HEADER);
+            
+            initiateVersionNegotiation(versionNegotiator, dis, dos);
+        } finally {
+            dos.flush();
+        }
+    }
+
+    
+    public static void receiveHandshake(final InputStream in, final OutputStream out, final VersionNegotiator versionNegotiator) throws IOException, HandshakeException {
+        final DataInputStream dis = new DataInputStream(in);
+        final DataOutputStream dos = new DataOutputStream(out);
+        
+        try {
+            final byte[] magicHeaderBuffer = new byte[4];
+            dis.readFully(magicHeaderBuffer);
+            
+            receiveVersionNegotiation(versionNegotiator, dis, dos);
+        } finally {
+            dos.flush();
+        }
+    }
+    
+    
+    private static void initiateVersionNegotiation(final VersionNegotiator negotiator, final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException {
+        // Write the classname of the RemoteStreamCodec, followed by its version
+        dos.writeInt(negotiator.getVersion());
+        dos.flush();
+        
+        // wait for response from server.
+        final int statusCode = dis.read();
+        switch (statusCode) {
+            case RESOURCE_OK:   // server accepted our proposal of codec name/version
+                return;
+            case DIFFERENT_RESOURCE_VERSION:    // server accepted our proposal of codec name but not the version
+                // Get server's preferred version
+                final int newVersion = dis.readInt();
+                
+                // Determine our new preferred version that is no greater than the server's preferred version.
+                final Integer newPreference = negotiator.getPreferredVersion(newVersion);
+                // If we could not agree with server on a version, fail now.
+                if ( newPreference == null ) {
+                    throw new HandshakeException("Could not agree on protocol version");
+                }
+                
+                negotiator.setVersion(newPreference);
+                
+                // Attempt negotiation of resource based on our new preferred version.
+                initiateVersionNegotiation(negotiator, dis, dos);
+            case ABORT:
+                throw new HandshakeException("Remote destination aborted connection with message: " + dis.readUTF());
+            default:
+                throw new HandshakeException("Received unexpected response code " + statusCode + " when negotiating version with remote server");
+        }
+    }
+    
+    private static void receiveVersionNegotiation(final VersionNegotiator negotiator, final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException {
+        final int version = dis.readInt();
+        if ( negotiator.isVersionSupported(version) ) {
+            dos.write(RESOURCE_OK);
+            dos.flush();
+            
+            negotiator.setVersion(version);
+        } else {
+            final Integer preferred = negotiator.getPreferredVersion(version);
+            if ( preferred == null ) {
+                dos.write(ABORT);
+                dos.flush();
+                throw new HandshakeException("Unable to negotiate an acceptable version of the Distributed Cache Protocol");
+            }
+            dos.write(DIFFERENT_RESOURCE_VERSION);
+            dos.writeInt(preferred);
+            dos.flush();
+            
+            receiveVersionNegotiation(negotiator, dis, dos);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java
new file mode 100644
index 0000000..8049d42
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.protocol.exception;
+
+public class HandshakeException extends Exception {
+    public HandshakeException(final String message) {
+        super(message);
+    }
+    
+    public HandshakeException(final Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/pom.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/pom.xml b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/pom.xml
new file mode 100644
index 0000000..5dec322
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/pom.xml
@@ -0,0 +1,81 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.nifi</groupId>
+		<artifactId>distributed-cache-services-bundle</artifactId>
+		<version>0.0.1-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>distributed-cache-server</artifactId>
+
+	<name>Distributed Cache Server</name>
+	<description>Provides a Controller Service for hosting Distributed Caches</description>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>nifi-api</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>distributed-cache-protocol</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>remote-communications-utils</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>nifi-processor-utils</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>nifi-stream-utils</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>ssl-context-service-api</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>wali</groupId>
+			<artifactId>wali</artifactId>
+		</dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>distributed-cache-client-service-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>distributed-cache-client-service</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>nifi-mock</artifactId>
+		</dependency>
+
+		<dependency>
+		    <groupId>org.apache.nifi</groupId>
+		    <artifactId>ssl-context-service</artifactId>
+		</dependency>
+	</dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
new file mode 100644
index 0000000..9b4e70e
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
+import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
+import org.apache.nifi.io.BufferedInputStream;
+import org.apache.nifi.io.BufferedOutputStream;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.remote.io.socket.SocketChannelInputStream;
+import org.apache.nifi.remote.io.socket.SocketChannelOutputStream;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractCacheServer implements CacheServer {
+
+    private static final Logger logger = LoggerFactory.getLogger(AbstractCacheServer.class);
+
+    private final String identifier;
+    private final int port;
+    private final SSLContext sslContext;
+    protected volatile boolean stopped = false;
+    private final Set<Thread> processInputThreads = new CopyOnWriteArraySet<>();;
+
+    private volatile ServerSocketChannel serverSocketChannel;
+
+    public AbstractCacheServer(final String identifier, final SSLContext sslContext, final int port) {
+        this.identifier = identifier;
+        this.port = port;
+        this.sslContext = sslContext;
+    }
+
+    @Override
+    public void start() throws IOException {
+        serverSocketChannel = ServerSocketChannel.open();
+        serverSocketChannel.configureBlocking(true);
+        serverSocketChannel.bind(new InetSocketAddress(port));
+
+        final Runnable runnable = new Runnable() {
+
+            @Override
+            public void run() {
+                while (true) {
+                    final SocketChannel socketChannel;
+                    try {
+                        socketChannel = serverSocketChannel.accept();
+                        logger.debug("Connected to {}", new Object[] { socketChannel });
+                    } catch (final IOException e) {
+                        if (!stopped) {
+                            logger.error("{} unable to accept connection from remote peer due to {}", this, e.toString());
+                            if (logger.isDebugEnabled()) {
+                                logger.error("", e);
+                            }
+                        }
+                        return;
+                    }
+
+                    final Runnable processInputRunnable = new Runnable() {
+                        @Override
+                        public void run() {
+                            final InputStream rawInputStream;
+                            final OutputStream rawOutputStream;
+                            final String peer = socketChannel.socket().getInetAddress().getHostName();
+
+                            try {
+                                if (sslContext == null) {
+                                    rawInputStream = new SocketChannelInputStream(socketChannel);
+                                    rawOutputStream = new SocketChannelOutputStream(socketChannel);
+                                } else {
+                                    final SSLSocketChannel sslSocketChannel = new SSLSocketChannel(sslContext, socketChannel, false);
+                                    sslSocketChannel.connect();
+                                    rawInputStream = new SSLSocketChannelInputStream(sslSocketChannel);
+                                    rawOutputStream = new SSLSocketChannelOutputStream(sslSocketChannel);
+                                }
+                            } catch (IOException e) {
+                                logger.error("Cannot create input and/or output streams for {}", new Object[] { identifier }, e);
+                                if (logger.isDebugEnabled()) {
+                                    logger.error("", e);
+                                }
+                                try {
+                                    socketChannel.close();
+                                } catch (IOException swallow) {
+                                }
+                               
+                                return;
+                            }
+                            try (final InputStream in = new BufferedInputStream(rawInputStream);
+                                    final OutputStream out = new BufferedOutputStream(rawOutputStream)) {
+
+                                final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
+
+                                ProtocolHandshake.receiveHandshake(in, out, versionNegotiator);
+
+                                boolean continueComms = true;
+                                while (continueComms) {
+                                    continueComms = listen(in, out, versionNegotiator.getVersion());
+                                }
+                                // client has issued 'close'
+                                logger.debug("Client issued close on {}", new Object[] { socketChannel });
+                            } catch (final SocketTimeoutException e) {
+                                logger.debug("30 sec timeout reached", e);
+                            } catch (final IOException | HandshakeException e) {
+                                if (!stopped) {
+                                    logger.error("{} unable to communicate with remote peer {} due to {}", new Object[] { this, peer, e.toString() });
+                                    if (logger.isDebugEnabled()) {
+                                        logger.error("", e);
+                                    }
+                                }
+                            } finally {
+                                processInputThreads.remove(Thread.currentThread());
+                            }
+                        }
+                    };
+
+                    final Thread processInputThread = new Thread(processInputRunnable);
+                    processInputThread.setName("Distributed Cache Server Communications Thread: " + identifier);
+                    processInputThread.setDaemon(true);
+                    processInputThread.start();
+                    processInputThreads.add(processInputThread);
+                }
+            }
+        };
+
+        final Thread thread = new Thread(runnable);
+        thread.setDaemon(true);
+        thread.setName("Distributed Cache Server: " + identifier);
+        thread.start();
+    }
+
+    @Override
+    public void stop() throws IOException {
+        stopped = true;
+        logger.info("Stopping CacheServer {}", new Object[] { this.identifier });
+
+        if (serverSocketChannel != null) {
+            serverSocketChannel.close();
+        }
+        // need to close out the created SocketChannels...this is done by interrupting
+        // the created threads that loop on listen().
+        for (Thread processInputThread : processInputThreads) {
+            processInputThread.interrupt();
+            int i = 0;
+            while (!processInputThread.isInterrupted() && i++ < 5) {
+                try {
+                    Thread.sleep(50); // allow thread to gracefully terminate
+                } catch (InterruptedException e) {
+                }
+            }
+        }
+        processInputThreads.clear();
+    }
+
+    @Override
+    public String toString() {
+        return "CacheServer[id=" + identifier + "]";
+    }
+
+    /**
+     * Listens for incoming data and communicates with remote peer
+     * 
+     * @param in
+     * @param out
+     * @param version
+     * @return <code>true</code> if communications should continue, <code>false</code> otherwise
+     * @throws IOException
+     */
+    protected abstract boolean listen(InputStream in, OutputStream out, int version) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java
new file mode 100644
index 0000000..71ac56d
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class CacheRecord {
+
+    private static final AtomicLong idGenerator = new AtomicLong(0L);
+    
+    private final long id;
+    private final long entryDate;
+    private volatile long lastHitDate;
+    private final AtomicInteger hitCount = new AtomicInteger(0);
+    
+    public CacheRecord() {
+        entryDate = System.currentTimeMillis();
+        lastHitDate = entryDate;
+        id = idGenerator.getAndIncrement();
+    }
+    
+    public long getEntryDate() {
+        return entryDate;
+    }
+    
+    public long getLastHitDate() {
+        return lastHitDate;
+    }
+    
+    public int getHitCount() {
+        return hitCount.get();
+    }
+
+    public void hit() {
+        hitCount.getAndIncrement();
+        lastHitDate = System.currentTimeMillis();
+    }
+    
+    public long getId() {
+        return id;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java
new file mode 100644
index 0000000..2c85cd8
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server;
+
+import java.io.IOException;
+
+public interface CacheServer {
+
+    void start() throws IOException;
+    void stop() throws IOException;
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
new file mode 100644
index 0000000..0f962d0
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.annotation.OnConfigured;
+import org.apache.nifi.processor.annotation.OnShutdown;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
+
+public abstract class DistributedCacheServer extends AbstractControllerService {
+    public static final String EVICTION_STRATEGY_LFU = "Least Frequently Used";
+    public static final String EVICTION_STRATEGY_LRU = "Least Recently Used";
+    public static final String EVICTION_STRATEGY_FIFO = "First In, First Out";
+
+    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+            .name("Port")
+            .description("The port to listen on for incoming connections")
+            .required(true)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .defaultValue("4557")
+            .build();
+    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+            .name("SSL Context Service")
+            .description(
+                    "If specified, this service will be used to create an SSL Context that will be used to secure communications; if not specified, communications will not be secure")
+            .required(false)
+            .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class))
+            .build();
+    public static final PropertyDescriptor MAX_CACHE_ENTRIES = new PropertyDescriptor.Builder()
+            .name("Maximum Cache Entries")
+            .description("The maximum number of cache entries that the cache can hold")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("10000")
+            .build();
+    public static final PropertyDescriptor EVICTION_POLICY = new PropertyDescriptor.Builder()
+            .name("Eviction Strategy")
+            .description("Determines which strategy should be used to evict values from the cache to make room for new entries")
+            .required(true)
+            .allowableValues(EVICTION_STRATEGY_LFU, EVICTION_STRATEGY_LRU, EVICTION_STRATEGY_FIFO)
+            .defaultValue(EVICTION_STRATEGY_LFU)
+            .build();
+    public static final PropertyDescriptor PERSISTENCE_PATH = new PropertyDescriptor.Builder()
+            .name("Persistence Directory")
+            .description("If specified, the cache will be persisted in the given directory; if not specified, the cache will be in-memory only")
+            .required(false)
+            .addValidator(StandardValidators.createDirectoryExistsValidator(true, true))
+            .build();
+
+    private volatile CacheServer cacheServer;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(PORT);
+        properties.add(MAX_CACHE_ENTRIES);
+        properties.add(EVICTION_POLICY);
+        properties.add(PERSISTENCE_PATH);
+        properties.add(new PropertyDescriptor.Builder().fromPropertyDescriptor(SSL_CONTEXT_SERVICE).allowableValues(
+                getControllerServiceLookup().getControllerServiceIdentifiers(SSLContextService.class)).build());
+        return properties;
+    }
+
+    @OnConfigured
+    public void startServer(final ConfigurationContext context) throws IOException {
+        if (cacheServer == null) {
+            cacheServer = createCacheServer(context);
+            cacheServer.start();
+        }
+    }
+
+    @OnShutdown
+    public void shutdownServer() throws IOException {
+        if (cacheServer != null) {
+            cacheServer.stop();
+        }
+        cacheServer = null;
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        shutdownServer();
+    }
+
+    protected abstract CacheServer createCacheServer(ConfigurationContext context);
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java
new file mode 100644
index 0000000..426573f
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server;
+
+import java.io.File;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.SSLContextService.ClientAuth;
+
+public class DistributedSetCacheServer extends DistributedCacheServer {
+
+    @Override
+    protected CacheServer createCacheServer(final ConfigurationContext context) {
+        final int port = context.getProperty(PORT).asInteger();
+        final String persistencePath = context.getProperty(PERSISTENCE_PATH).getValue();
+        final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        final int maxSize = context.getProperty(MAX_CACHE_ENTRIES).asInteger();
+        final String evictionPolicyName = context.getProperty(EVICTION_POLICY).getValue();
+        
+        final SSLContext sslContext;
+        if ( sslContextService == null ) {
+            sslContext = null;
+        } else {
+            sslContext = sslContextService.createSSLContext(ClientAuth.REQUIRED);
+        }
+        
+        final EvictionPolicy evictionPolicy;
+        switch (evictionPolicyName) {
+            case EVICTION_STRATEGY_FIFO:
+                evictionPolicy = EvictionPolicy.FIFO;
+                break;
+            case EVICTION_STRATEGY_LFU:
+                evictionPolicy = EvictionPolicy.LFU;
+                break;
+            case EVICTION_STRATEGY_LRU:
+                evictionPolicy = EvictionPolicy.LRU;
+                break;
+            default:
+                throw new IllegalArgumentException("Illegal Eviction Policy: " + evictionPolicyName);
+        }
+        
+        try {
+            final File persistenceDir = persistencePath == null ? null : new File(persistencePath);
+            
+            return new SetCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir);
+        } catch (final Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java
new file mode 100644
index 0000000..60bd2c1
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server;
+
+import java.util.Comparator;
+
+public enum EvictionPolicy {
+    LFU(new LFUComparator()),
+    LRU(new LRUComparator()),
+    FIFO(new FIFOComparator());
+    
+    private final Comparator<CacheRecord> comparator;
+    
+    private EvictionPolicy(final Comparator<CacheRecord> comparator) {
+        this.comparator = comparator;
+    }
+    
+    public Comparator<CacheRecord> getComparator() {
+        return comparator;
+    }
+    
+    public static class LFUComparator implements Comparator<CacheRecord> {
+        @Override
+        public int compare(final CacheRecord o1, final CacheRecord o2) {
+            if ( o1.equals(o2) ) {
+                return 0;
+            }
+            
+            final int hitCountComparison = Integer.compare(o1.getHitCount(), o2.getHitCount());
+            final int entryDateComparison = (hitCountComparison == 0) ? Long.compare(o1.getEntryDate(), o2.getEntryDate()) : hitCountComparison;
+            return (entryDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : entryDateComparison);
+        }
+    }
+    
+    public static class LRUComparator implements Comparator<CacheRecord> {
+        @Override
+        public int compare(final CacheRecord o1, final CacheRecord o2) {
+            if ( o1.equals(o2) ) {
+                return 0;
+            }
+
+            final int lastHitDateComparison = Long.compare(o1.getLastHitDate(), o2.getLastHitDate());
+            return (lastHitDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : lastHitDateComparison);
+        }
+    }
+    
+    public static class FIFOComparator implements Comparator<CacheRecord> {
+        @Override
+        public int compare(final CacheRecord o1, final CacheRecord o2) {
+            if ( o1.equals(o2) ) {
+                return 0;
+            }
+
+            final int entryDateComparison = Long.compare(o1.getEntryDate(), o2.getEntryDate());
+            return (entryDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : entryDateComparison);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
new file mode 100644
index 0000000..5d2c0f6
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.distributed.cache.server.set.PersistentSetCache;
+import org.apache.nifi.distributed.cache.server.set.SetCache;
+import org.apache.nifi.distributed.cache.server.set.SetCacheResult;
+import org.apache.nifi.distributed.cache.server.set.SimpleSetCache;
+import org.apache.nifi.io.DataOutputStream;
+
+public class SetCacheServer extends AbstractCacheServer {
+
+    private final SetCache cache;
+
+    public SetCacheServer(final String identifier, final SSLContext sslContext, final int port, final int maxSize,
+            final EvictionPolicy evictionPolicy, final File persistencePath) throws IOException {
+        super(identifier, sslContext, port);
+
+        final SetCache simpleCache = new SimpleSetCache(identifier, maxSize, evictionPolicy);
+
+        if (persistencePath == null) {
+            this.cache = simpleCache;
+        } else {
+            final PersistentSetCache persistentCache = new PersistentSetCache(identifier, persistencePath, simpleCache);
+            persistentCache.restore();
+            this.cache = persistentCache;
+        }
+    }
+
+    @Override
+    protected boolean listen(final InputStream in, final OutputStream out, final int version) throws IOException {
+        final DataInputStream dis = new DataInputStream(in);
+        final DataOutputStream dos = new DataOutputStream(out);
+
+        final String action = dis.readUTF();
+        if (action.equals("close")) {
+            return false;
+        }
+
+        final int valueLength = dis.readInt();
+        final byte[] value = new byte[valueLength];
+        dis.readFully(value);
+        final ByteBuffer valueBuffer = ByteBuffer.wrap(value);
+
+        final SetCacheResult response;
+        switch (action) {
+        case "addIfAbsent":
+            response = cache.addIfAbsent(valueBuffer);
+            break;
+        case "contains":
+            response = cache.contains(valueBuffer);
+            break;
+        case "remove":
+            response = cache.remove(valueBuffer);
+            break;
+        default:
+            throw new IOException("IllegalRequest");
+        }
+
+        dos.writeBoolean(response.getResult());
+        dos.flush();
+
+        return true;
+    }
+
+    @Override
+    public void stop() throws IOException {
+        try {
+            super.stop();
+        } finally {
+            cache.shutdown();
+        }
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        if (!stopped)
+            stop();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
new file mode 100644
index 0000000..920529d
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.map;
+
+import java.io.File;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.distributed.cache.server.CacheServer;
+import org.apache.nifi.distributed.cache.server.DistributedCacheServer;
+import org.apache.nifi.distributed.cache.server.EvictionPolicy;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.SSLContextService.ClientAuth;
+
+public class DistributedMapCacheServer extends DistributedCacheServer {
+
+    @Override
+    protected CacheServer createCacheServer(final ConfigurationContext context) {
+        final int port = context.getProperty(PORT).asInteger();
+        final String persistencePath = context.getProperty(PERSISTENCE_PATH).getValue();
+        final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        final int maxSize = context.getProperty(MAX_CACHE_ENTRIES).asInteger();
+        final String evictionPolicyName = context.getProperty(EVICTION_POLICY).getValue();
+        
+        final SSLContext sslContext;
+        if ( sslContextService == null ) {
+            sslContext = null;
+        } else {
+            sslContext = sslContextService.createSSLContext(ClientAuth.REQUIRED);
+        }
+        
+        final EvictionPolicy evictionPolicy;
+        switch (evictionPolicyName) {
+            case EVICTION_STRATEGY_FIFO:
+                evictionPolicy = EvictionPolicy.FIFO;
+                break;
+            case EVICTION_STRATEGY_LFU:
+                evictionPolicy = EvictionPolicy.LFU;
+                break;
+            case EVICTION_STRATEGY_LRU:
+                evictionPolicy = EvictionPolicy.LRU;
+                break;
+            default:
+                throw new IllegalArgumentException("Illegal Eviction Policy: " + evictionPolicyName);
+        }
+        
+        try {
+            final File persistenceDir = persistencePath == null ? null : new File(persistencePath);
+            
+            return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir);
+        } catch (final Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
new file mode 100644
index 0000000..534cb0b
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.map;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public interface MapCache {
+
+    MapPutResult putIfAbsent(ByteBuffer key, ByteBuffer value) throws IOException;
+    boolean containsKey(ByteBuffer key) throws IOException;
+    ByteBuffer get(ByteBuffer key) throws IOException;
+    ByteBuffer remove(ByteBuffer key) throws IOException;
+    void shutdown() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java
new file mode 100644
index 0000000..b0ab0c4
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.map;
+
+import java.nio.ByteBuffer;
+
+import org.apache.nifi.distributed.cache.server.CacheRecord;
+
+public class MapCacheRecord extends CacheRecord {
+    private final ByteBuffer key;
+    private final ByteBuffer value;
+    
+    public MapCacheRecord(final ByteBuffer key, final ByteBuffer value) {
+        this.key = key;
+        this.value = value;
+    }
+    
+    public ByteBuffer getKey() {
+        return key;
+    }
+    
+    public ByteBuffer getValue() {
+        return value;
+    }
+    
+    @Override
+    public int hashCode() {
+        return 2938476 + key.hashCode() * value.hashCode();
+    }
+    
+    @Override
+    public boolean equals(final Object obj) {
+        if ( obj == this ) {
+            return true;
+        }
+        
+        if ( obj instanceof MapCacheRecord ) {
+            final MapCacheRecord that = ((MapCacheRecord) obj);
+            return key.equals(that.key) && value.equals(that.value);
+        }
+        
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
new file mode 100644
index 0000000..3e8dd0e
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.map;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.distributed.cache.server.AbstractCacheServer;
+import org.apache.nifi.distributed.cache.server.EvictionPolicy;
+import org.apache.nifi.io.DataOutputStream;
+
+public class MapCacheServer extends AbstractCacheServer {
+
+    private final MapCache cache;
+
+    public MapCacheServer(final String identifier, final SSLContext sslContext, final int port, final int maxSize,
+            final EvictionPolicy evictionPolicy, final File persistencePath) throws IOException {
+        super(identifier, sslContext, port);
+
+        final MapCache simpleCache = new SimpleMapCache(identifier, maxSize, evictionPolicy);
+
+        if (persistencePath == null) {
+            this.cache = simpleCache;
+        } else {
+            final PersistentMapCache persistentCache = new PersistentMapCache(identifier, persistencePath, simpleCache);
+            persistentCache.restore();
+            this.cache = persistentCache;
+        }
+    }
+
+    @Override
+    protected boolean listen(final InputStream in, final OutputStream out, final int version) throws IOException {
+        final DataInputStream dis = new DataInputStream(in);
+        final DataOutputStream dos = new DataOutputStream(out);
+        final String action = dis.readUTF();
+        try {
+            switch (action) {
+            case "close": {
+                return false;
+            }
+            case "putIfAbsent": {
+                final byte[] key = readValue(dis);
+                final byte[] value = readValue(dis);
+                final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
+                dos.writeBoolean(putResult.isSuccessful());
+                break;
+            }
+            case "containsKey": {
+                final byte[] key = readValue(dis);
+                final boolean contains = cache.containsKey(ByteBuffer.wrap(key));
+                dos.writeBoolean(contains);
+                break;
+            }
+            case "getAndPutIfAbsent": {
+                final byte[] key = readValue(dis);
+                final byte[] value = readValue(dis);
+
+                final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
+                if (putResult.isSuccessful()) {
+                    // Put was successful. There was no old value to get.
+                    dos.writeInt(0);
+                } else {
+                    // we didn't put. Write back the previous value
+                    final byte[] byteArray = putResult.getExistingValue().array();
+                    dos.writeInt(byteArray.length);
+                    dos.write(byteArray);
+                }
+
+                break;
+            }
+            case "get": {
+                final byte[] key = readValue(dis);
+                final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key));
+                if (existingValue == null) {
+                    // there was no existing value; we did a "put".
+                    dos.writeInt(0);
+                } else {
+                    // a value already existed. we did not update the map
+                    final byte[] byteArray = existingValue.array();
+                    dos.writeInt(byteArray.length);
+                    dos.write(byteArray);
+                }
+
+                break;
+            }
+            case "remove": {
+                final byte[] key = readValue(dis);
+                final boolean removed = cache.remove(ByteBuffer.wrap(key)) != null;
+                dos.writeBoolean(removed);
+                break;
+            }
+            default: {
+                throw new IOException("Illegal Request");
+            }
+            }
+        } finally {
+            dos.flush();
+        }
+
+        return true;
+    }
+
+    @Override
+    public void stop() throws IOException {
+        try {
+            super.stop();
+        } finally {
+            cache.shutdown();
+        }
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        if (!stopped)
+            stop();
+    }
+
+    private byte[] readValue(final DataInputStream dis) throws IOException {
+        final int numBytes = dis.readInt();
+        final byte[] buffer = new byte[numBytes];
+        dis.readFully(buffer);
+        return buffer;
+    }
+
+}