You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/15 11:14:50 UTC

[05/12] incubator-nifi git commit: NIFI-169 well it finally all builds. There is a classpath issue still to sort out which impacts startup

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java b/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
new file mode 100644
index 0000000..5bead8c
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import java.io.IOException;
+
+import org.apache.nifi.controller.ControllerService;
+
+/**
+ * This interface defines an API that can be used for interacting with a
+ * Distributed Cache that functions similarly to a {@link java.util.Map Map}.
+ *
+ */
+public interface DistributedMapCacheClient extends ControllerService {
+
+    /**
+     * Adds the specified key and value to the cache, if they are not already
+     * present, serializing the key and value with the given
+     * {@link Serializer}s.
+     *
+     * @param <K>
+     * @param <V>
+     * @param key the key for into the map
+     * @param value the value to add to the map if and only if the key is absent
+     * @param keySerializer
+     * @param valueSerializer
+     * @return true if the value was added to the cache, false if the value
+     * already existed in the cache
+     *
+     * @throws IOException if unable to communicate with the remote instance
+     */
+    <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException;
+
+    /**
+     * Adds the specified key and value to the cache, if they are not already
+     * present, serializing the key and value with the given
+     * {@link Serializer}s. If a value already exists in the cache for the given
+     * key, the value associated with the key is returned, after being
+     * deserialized with the given valueDeserializer.
+     *
+     * @param <K>
+     * @param <V>
+     * @param key
+     * @param value
+     * @param keySerializer
+     * @param valueSerializer
+     * @param valueDeserializer
+     * @return
+     * @throws IOException
+     */
+    <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException;
+
+    /**
+     * Determines if the given value is present in the cache and if so returns
+     * <code>true</code>, else returns <code>false</code>
+     *
+     * @param <K>
+     * @param key
+     * @param keySerializer
+     * @return
+     *
+     * @throws IOException if unable to communicate with the remote instance
+     */
+    <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException;
+
+    /**
+     * Returns the value in the cache for the given key, if one exists;
+     * otherwise returns <code>null</code>
+     *
+     * @param <K>
+     * @param <V>
+     * @param key the key to lookup in the map
+     * @param keySerializer
+     * @param valueDeserializer
+     *
+     * @return
+     * @throws IOException
+     */
+    <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException;
+
+    /**
+     * Attempts to notify the server that we are finished communicating with it
+     * and cleans up resources
+     * @throws java.io.IOException
+     */
+    void close() throws IOException;
+
+    /**
+     * Removes the entry with the given key from the cache, if it is present.
+     *
+     * @param <K>
+     * @param key
+     * @param serializer
+     * @return <code>true</code> if the entry is removed, <code>false</code> if
+     * the key did not exist in the cache
+     * @throws IOException
+     */
+    <K> boolean remove(K key, Serializer<K> serializer) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClient.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClient.java b/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClient.java
new file mode 100644
index 0000000..12aae3e
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClient.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import java.io.IOException;
+
+import org.apache.nifi.controller.ControllerService;
+
+/**
+ * This interface defines an API that can be used for interacting with a
+ * Distributed Cache that functions similarly to a {@link java.util.Set Set}.
+ */
+public interface DistributedSetCacheClient extends ControllerService {
+
+    /**
+     * Adds the specified value to the cache, serializing the value with the
+     * given {@link Serializer}.
+     *
+     * @param <T>
+     * @param value
+     * @param serializer
+     * @return true if the value was added to the cache, false if the value
+     * already existed in the cache
+     *
+     * @throws IOException if unable to communicate with the remote instance
+     */
+    <T> boolean addIfAbsent(T value, Serializer<T> serializer) throws IOException;
+
+    /**
+     * Returns if the given value is present in the cache and if so returns
+     * <code>true</code>, else returns <code>false</code>
+     *
+     * @param <T>
+     * @param value
+     * @param serializer
+     * @return
+     *
+     * @throws IOException if unable to communicate with the remote instance
+     */
+    <T> boolean contains(T value, Serializer<T> serializer) throws IOException;
+
+    /**
+     * Removes the given value from the cache, if it is present.
+     *
+     * @param <T>
+     * @param value
+     * @param serializer
+     * @return <code>true</code> if the value is removed, <code>false</code> if
+     * the value did not exist in the cache
+     * @throws IOException
+     */
+    <T> boolean remove(T value, Serializer<T> serializer) throws IOException;
+
+    /**
+     * Attempts to notify the server that we are finished communicating with it
+     * and cleans up resources
+     * @throws java.io.IOException
+     */
+    void close() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/Serializer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/Serializer.java b/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/Serializer.java
new file mode 100644
index 0000000..f1896be
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/Serializer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.nifi.distributed.cache.client.exception.SerializationException;
+
+/**
+ * Provides a mechanism by which a value can be serialized to a stream of bytes
+ * @param <T>
+ */
+public interface Serializer<T> {
+
+    /**
+     * Serializes the given value to the {@link OutputStream}
+     *
+     * @param value
+     * @param output
+     * @throws SerializationException If unable to serialize the given value
+     * @throws java.io.IOException
+     */
+    void serialize(T value, OutputStream output) throws SerializationException, IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/exception/DeserializationException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/exception/DeserializationException.java b/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/exception/DeserializationException.java
new file mode 100644
index 0000000..bb2fcb2
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/exception/DeserializationException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client.exception;
+
+public class DeserializationException extends RuntimeException {
+
+    public DeserializationException(final Throwable cause) {
+        super(cause);
+    }
+
+    public DeserializationException(final String message) {
+        super(message);
+    }
+
+    public DeserializationException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/exception/SerializationException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/exception/SerializationException.java b/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/exception/SerializationException.java
new file mode 100644
index 0000000..aac59f5
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/exception/SerializationException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client.exception;
+
+public class SerializationException extends RuntimeException {
+
+    public SerializationException(final Throwable cause) {
+        super(cause);
+    }
+
+    public SerializationException(final String message) {
+        super(message);
+    }
+
+    public SerializationException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml
new file mode 100644
index 0000000..a251393
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml
@@ -0,0 +1,60 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>distributed-cache-services-bundle</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>distributed-cache-client-service</artifactId>
+    <packaging>jar</packaging>
+
+    <name>Distributed Cache Client Service</name>
+    <description>Provides a Client for interfacing with a Distributed Cache</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>distributed-cache-client-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>distributed-cache-protocol</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>remote-communications-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-processor-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-stream-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>ssl-context-service-api</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java
new file mode 100644
index 0000000..f838c2f
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+
+public interface CommsSession extends Closeable {
+
+    void setTimeout(final long value, final TimeUnit timeUnit);
+    
+    InputStream getInputStream() throws IOException;
+    
+    OutputStream getOutputStream() throws IOException;
+    
+    boolean isClosed();
+    
+    void interrupt();
+    
+    String getHostname();
+    
+    int getPort();
+    
+    long getTimeout(TimeUnit timeUnit);
+    
+    SSLContext getSSLContext();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
new file mode 100644
index 0000000..ee96660
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.annotation.OnConfigured;
+import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
+import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
+import org.apache.nifi.io.ByteArrayOutputStream;
+import org.apache.nifi.io.DataOutputStream;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.SSLContextService.ClientAuth;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DistributedMapCacheClientService extends AbstractControllerService implements DistributedMapCacheClient {
+
+    private static final Logger logger = LoggerFactory.getLogger(DistributedMapCacheClientService.class);
+
+    public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+            .name("Server Hostname")
+            .description("The name of the server that is running the DistributedMapCacheServer service")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+            .name("Server Port")
+            .description("The port on the remote server that is to be used when communicating with the DistributedMapCacheServer service")
+            .required(true)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .defaultValue("4557")
+            .build();
+    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+            .name("SSL Context Service")
+            .description(
+                    "If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted")
+            .required(false)
+            .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class))
+            .defaultValue(null)
+            .build();
+    public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("Communications Timeout")
+            .description(
+                    "Specifies how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("30 secs")
+            .build();
+
+    private final BlockingQueue<CommsSession> queue = new LinkedBlockingQueue<>();
+    private volatile ConfigurationContext configContext;
+    private volatile boolean closed = false;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(HOSTNAME);
+        descriptors.add(PORT);
+        descriptors.add(SSL_CONTEXT_SERVICE);
+        descriptors.add(COMMUNICATIONS_TIMEOUT);
+        return descriptors;
+    }
+
+    @OnConfigured
+    public void cacheConfig(final ConfigurationContext context) {
+        this.configContext = context;
+    }
+
+    @Override
+    public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer)
+            throws IOException {
+        return withCommsSession(new CommsAction<Boolean>() {
+            @Override
+            public Boolean execute(final CommsSession session) throws IOException {
+                final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+                dos.writeUTF("putIfAbsent");
+
+                serialize(key, keySerializer, dos);
+                serialize(value, valueSerializer, dos);
+
+                dos.flush();
+
+                final DataInputStream dis = new DataInputStream(session.getInputStream());
+                return dis.readBoolean();
+            }
+        });
+    }
+
+    @Override
+    public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
+        return withCommsSession(new CommsAction<Boolean>() {
+            @Override
+            public Boolean execute(final CommsSession session) throws IOException {
+                final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+                dos.writeUTF("containsKey");
+
+                serialize(key, keySerializer, dos);
+                dos.flush();
+
+                final DataInputStream dis = new DataInputStream(session.getInputStream());
+                return dis.readBoolean();
+            }
+        });
+    }
+
+    @Override
+    public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
+            final Deserializer<V> valueDeserializer) throws IOException {
+        return withCommsSession(new CommsAction<V>() {
+            @Override
+            public V execute(final CommsSession session) throws IOException {
+                final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+                dos.writeUTF("getAndPutIfAbsent");
+
+                serialize(key, keySerializer, dos);
+                serialize(value, valueSerializer, dos);
+                dos.flush();
+
+                // read response
+                final DataInputStream dis = new DataInputStream(session.getInputStream());
+                final byte[] responseBuffer = readLengthDelimitedResponse(dis);
+                return valueDeserializer.deserialize(responseBuffer);
+            }
+        });
+    }
+
+    @Override
+    public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
+        return withCommsSession(new CommsAction<V>() {
+            @Override
+            public V execute(final CommsSession session) throws IOException {
+                final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+                dos.writeUTF("get");
+
+                serialize(key, keySerializer, dos);
+                dos.flush();
+
+                // read response
+                final DataInputStream dis = new DataInputStream(session.getInputStream());
+                final byte[] responseBuffer = readLengthDelimitedResponse(dis);
+                return valueDeserializer.deserialize(responseBuffer);
+            }
+        });
+    }
+
+    @Override
+    public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException {
+        return withCommsSession(new CommsAction<Boolean>() {
+            @Override
+            public Boolean execute(final CommsSession session) throws IOException {
+                final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+                dos.writeUTF("remove");
+
+                serialize(key, serializer, dos);
+                dos.flush();
+
+                // read response
+                final DataInputStream dis = new DataInputStream(session.getInputStream());
+                return dis.readBoolean();
+            }
+        });
+    }
+
+    private byte[] readLengthDelimitedResponse(final DataInputStream dis) throws IOException {
+        final int responseLength = dis.readInt();
+        final byte[] responseBuffer = new byte[responseLength];
+        dis.readFully(responseBuffer);
+        return responseBuffer;
+    }
+
+    public CommsSession createCommsSession(final ConfigurationContext context) throws IOException {
+        final String hostname = context.getProperty(HOSTNAME).getValue();
+        final int port = context.getProperty(PORT).asInteger();
+        final long timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+        final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+
+        final CommsSession commsSession;
+        if (sslContextService == null) {
+            commsSession = new StandardCommsSession(hostname, port);
+        } else {
+            commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port);
+        }
+
+        commsSession.setTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
+        return commsSession;
+    }
+
+    private CommsSession leaseCommsSession() throws IOException {
+        CommsSession session = queue.poll();
+        if (session != null && !session.isClosed()) {
+            return session;
+        }
+
+        session = createCommsSession(configContext);
+        final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
+        try {
+            ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator);
+        } catch (final HandshakeException e) {
+            try {
+                session.close();
+            } catch (final IOException ioe) {
+            }
+
+            throw new IOException(e);
+        }
+
+        return session;
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.closed = true;
+
+        CommsSession commsSession;
+        while ((commsSession = queue.poll()) != null) {
+            try (final DataOutputStream dos = new DataOutputStream(commsSession.getOutputStream())) {
+                dos.writeUTF("close");
+                dos.flush();
+                commsSession.close();
+            } catch (final IOException e) {
+            }
+        }
+        logger.info("Closed {}", new Object[] { getIdentifier() });
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        if (!closed)
+            close();
+        logger.debug("Finalize called");
+    }
+
+    private <T> void serialize(final T value, final Serializer<T> serializer, final DataOutputStream dos) throws IOException {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        serializer.serialize(value, baos);
+        dos.writeInt(baos.size());
+        baos.writeTo(dos);
+    }
+
+    private <T> T withCommsSession(final CommsAction<T> action) throws IOException {
+        if (closed) {
+            throw new IllegalStateException("Client is closed");
+        }
+
+        final CommsSession session = leaseCommsSession();
+        try {
+            return action.execute(session);
+        } catch (final IOException ioe) {
+            try {
+                session.close();
+            } catch (final IOException ignored) {
+            }
+
+            throw ioe;
+        } finally {
+            if (!session.isClosed()) {
+                if (this.closed) {
+                    try {
+                        session.close();
+                    } catch (final IOException ioe) {
+                    }
+                } else {
+                    queue.offer(session);
+                }
+            }
+        }
+    }
+
+    private static interface CommsAction<T> {
+        T execute(CommsSession commsSession) throws IOException;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
new file mode 100644
index 0000000..1d7c94c
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.annotation.OnConfigured;
+import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
+import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
+import org.apache.nifi.io.ByteArrayOutputStream;
+import org.apache.nifi.io.DataOutputStream;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.SSLContextService.ClientAuth;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DistributedSetCacheClientService extends AbstractControllerService implements DistributedSetCacheClient {
+
+    private static final Logger logger = LoggerFactory.getLogger(DistributedMapCacheClientService.class);
+
+    public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+            .name("Server Hostname")
+            .description("The name of the server that is running the DistributedSetCacheServer service")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+            .name("Server Port")
+            .description("The port on the remote server that is to be used when communicating with the DistributedSetCacheServer service")
+            .required(true)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .defaultValue("4557")
+            .build();
+    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+            .name("SSL Context Service")
+            .description(
+                    "If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted")
+            .required(false)
+            .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class))
+            .defaultValue(null)
+            .build();
+    public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("Communications Timeout")
+            .description(
+                    "Specifices how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("30 secs")
+            .build();
+
+    private final BlockingQueue<CommsSession> queue = new LinkedBlockingQueue<>();
+    private volatile ConfigurationContext configContext;
+    private volatile boolean closed = false;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(HOSTNAME);
+        descriptors.add(PORT);
+        descriptors.add(SSL_CONTEXT_SERVICE);
+        descriptors.add(COMMUNICATIONS_TIMEOUT);
+        return descriptors;
+    }
+
+    @OnConfigured
+    public void onConfigured(final ConfigurationContext context) {
+        this.configContext = context;
+    }
+
+    public CommsSession createCommsSession(final ConfigurationContext context) throws IOException {
+        final String hostname = context.getProperty(HOSTNAME).getValue();
+        final int port = context.getProperty(PORT).asInteger();
+        final long timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+        final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+
+        final CommsSession commsSession;
+        if (sslContextService == null) {
+            commsSession = new StandardCommsSession(hostname, port);
+        } else {
+            commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port);
+        }
+
+        commsSession.setTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
+        return commsSession;
+    }
+
+    private CommsSession leaseCommsSession() throws IOException {
+        CommsSession session = queue.poll();
+        if (session != null && !session.isClosed()) {
+            return session;
+        }
+
+        session = createCommsSession(configContext);
+        final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
+        try {
+            ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator);
+        } catch (final HandshakeException e) {
+            try {
+                session.close();
+            } catch (final IOException ioe) {
+            }
+
+            throw new IOException(e);
+        }
+
+        return session;
+    }
+
+    @Override
+    public <T> boolean addIfAbsent(final T value, final Serializer<T> serializer) throws IOException {
+        return invokeRemoteBoolean("addIfAbsent", value, serializer);
+    }
+
+    @Override
+    public <T> boolean contains(final T value, final Serializer<T> serializer) throws IOException {
+        return invokeRemoteBoolean("contains", value, serializer);
+    }
+
+    @Override
+    public <T> boolean remove(final T value, final Serializer<T> serializer) throws IOException {
+        return invokeRemoteBoolean("remove", value, serializer);
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.closed = true;
+
+        CommsSession commsSession;
+        while ((commsSession = queue.poll()) != null) {
+            try (final DataOutputStream dos = new DataOutputStream(commsSession.getOutputStream())) {
+                dos.writeUTF("close");
+                dos.flush();
+                commsSession.close();
+            } catch (final IOException e) {
+            }
+        }
+        logger.info("Closed {}", new Object[] { getIdentifier() });
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        if (!closed)
+            close();
+        logger.debug("Finalize called");
+    }
+
+    private <T> boolean invokeRemoteBoolean(final String methodName, final T value, final Serializer<T> serializer) throws IOException {
+        if (closed) {
+            throw new IllegalStateException("Client is closed");
+        }
+
+        final CommsSession session = leaseCommsSession();
+        try {
+            final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+            dos.writeUTF(methodName);
+
+            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            serializer.serialize(value, baos);
+            dos.writeInt(baos.size());
+            baos.writeTo(dos);
+            dos.flush();
+
+            final DataInputStream dis = new DataInputStream(session.getInputStream());
+            return dis.readBoolean();
+        } catch (final IOException ioe) {
+            try {
+                session.close();
+            } catch (final IOException ignored) {
+            }
+
+            throw ioe;
+        } finally {
+            if (!session.isClosed()) {
+                if (this.closed) {
+                    try {
+                        session.close();
+                    } catch (final IOException ioe) {
+                    }
+                } else {
+                    queue.offer(session);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
new file mode 100644
index 0000000..c8be082
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.io.BufferedInputStream;
+import org.apache.nifi.io.BufferedOutputStream;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream;
+
+public class SSLCommsSession implements CommsSession {
+    private final SSLSocketChannel sslSocketChannel;
+    private final SSLContext sslContext;
+    private final String hostname;
+    private final int port;
+    
+    private final SSLSocketChannelInputStream in;
+    private final BufferedInputStream bufferedIn;
+    
+    private final SSLSocketChannelOutputStream out;
+    private final BufferedOutputStream bufferedOut;
+
+    public SSLCommsSession(final SSLContext sslContext, final String hostname, final int port) throws IOException { 
+        sslSocketChannel = new SSLSocketChannel(sslContext, hostname, port, true);
+        
+        in = new SSLSocketChannelInputStream(sslSocketChannel);
+        bufferedIn = new BufferedInputStream(in);
+        
+        out = new SSLSocketChannelOutputStream(sslSocketChannel);
+        bufferedOut = new BufferedOutputStream(out);
+        
+        this.sslContext = sslContext;
+        this.hostname = hostname;
+        this.port = port;
+    }
+    
+    @Override
+    public void interrupt() {
+        sslSocketChannel.interrupt();
+    }
+    
+    @Override
+    public void close() throws IOException {
+        sslSocketChannel.close();
+    }
+
+    @Override
+    public void setTimeout(final long value, final TimeUnit timeUnit) {
+        sslSocketChannel.setTimeout((int) TimeUnit.MILLISECONDS.convert(value, timeUnit));
+    }
+
+    @Override
+    public InputStream getInputStream() throws IOException {
+        return bufferedIn;
+    }
+
+    @Override
+    public OutputStream getOutputStream() throws IOException {
+        return bufferedOut;
+    }
+
+    @Override
+    public boolean isClosed() {
+        return sslSocketChannel.isClosed();
+    }
+    
+    @Override
+    public String getHostname() {
+        return hostname;
+    }
+    
+    @Override
+    public int getPort() {
+        return port;
+    }
+    @Override
+    public SSLContext getSSLContext() {
+        return sslContext;
+    }
+    @Override
+    public long getTimeout(final TimeUnit timeUnit) {
+        return timeUnit.convert(sslSocketChannel.getTimeout(), TimeUnit.MILLISECONDS);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
new file mode 100644
index 0000000..bbe2917
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.io.BufferedInputStream;
+import org.apache.nifi.io.BufferedOutputStream;
+import org.apache.nifi.remote.io.InterruptableInputStream;
+import org.apache.nifi.remote.io.InterruptableOutputStream;
+import org.apache.nifi.remote.io.socket.SocketChannelInputStream;
+import org.apache.nifi.remote.io.socket.SocketChannelOutputStream;
+
+public class StandardCommsSession implements CommsSession {
+    private final SocketChannel socketChannel;
+    private final String hostname;
+    private final int port;
+    private volatile long timeoutMillis;
+
+    private final SocketChannelInputStream in;
+    private final InterruptableInputStream bufferedIn;
+
+    private final SocketChannelOutputStream out;
+    private final InterruptableOutputStream bufferedOut;
+
+    public StandardCommsSession(final String hostname, final int port) throws IOException {
+        socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port));
+        socketChannel.configureBlocking(false);
+        in = new SocketChannelInputStream(socketChannel);
+        bufferedIn = new InterruptableInputStream(new BufferedInputStream(in));
+
+        out = new SocketChannelOutputStream(socketChannel);
+        bufferedOut = new InterruptableOutputStream(new BufferedOutputStream(out));
+
+        this.hostname = hostname;
+        this.port = port;
+    }
+
+    @Override
+    public void interrupt() {
+        bufferedIn.interrupt();
+        bufferedOut.interrupt();
+    }
+
+    @Override
+    public void close() throws IOException {
+        socketChannel.close();
+    }
+
+    @Override
+    public void setTimeout(final long value, final TimeUnit timeUnit) {
+        in.setTimeout((int) TimeUnit.MILLISECONDS.convert(value, timeUnit));
+        out.setTimeout((int) TimeUnit.MILLISECONDS.convert(value, timeUnit));
+        timeoutMillis = TimeUnit.MILLISECONDS.convert(value, timeUnit);
+    }
+
+    @Override
+    public InputStream getInputStream() throws IOException {
+        return bufferedIn;
+    }
+
+    @Override
+    public OutputStream getOutputStream() throws IOException {
+        return bufferedOut;
+    }
+
+    @Override
+    public boolean isClosed() {
+        boolean closed = !socketChannel.isConnected();
+        if (!closed) {
+            try {
+                this.in.isDataAvailable();
+            } catch (IOException e) {
+                try {
+                    close();
+                } catch (IOException e1) {
+                }
+                closed = true;
+            }
+        }
+        return closed;
+    }
+
+    @Override
+    public String getHostname() {
+        return hostname;
+    }
+
+    @Override
+    public int getPort() {
+        return port;
+    }
+
+    @Override
+    public SSLContext getSSLContext() {
+        return null;
+    }
+
+    @Override
+    public long getTimeout(final TimeUnit timeUnit) {
+        return timeUnit.convert(timeoutMillis, TimeUnit.MILLISECONDS);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000..a91f7ee
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService
+org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html
new file mode 100644
index 0000000..d5f3595
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html
@@ -0,0 +1,78 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+<meta charset="utf-8" />
+<title>Distributed Map Cache Client Service</title>
+<link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+	<h2>Description:</h2>
+
+	<p>A Controller Service that can be used to communicate with a
+		Distributed Map Cache Server.</p>
+
+
+
+	<p>
+		<strong>Properties:</strong>
+	</p>
+	<p>In the list below, the names of required properties appear
+		in bold. Any other properties (not in bold) are considered optional.
+		If a property has a default value, it is indicated. If a property
+		supports the use of the NiFi Expression Language (or simply,
+		"expression language"), that is also indicated.</p>
+
+	<ul>
+		<li><strong>Server Hostname</strong>
+			<ul>
+				<li>The name of the server that is running the DistributedMapCacheServer service</li>
+				<li>Default value: no default</li>
+				<li>Supports expression language: false</li>
+			</ul></li>
+		<li><strong>Server Port</strong>
+			<ul>
+				<li>The port on the remote server that is to be used when communicating with the 
+				<a href="../nifi.distributed.cache.server.map.DistributedMapCacheServer/index.html">DistributedMapCacheServer</a> service</li>
+				
+				<li>Default value: 4557</li>
+				<li>Supports expression language: false</li>
+			</ul></li>
+		<li>SSL Context Service
+			<ul>
+				<li>If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted
+				<li>Default value: no default</li>
+				<li>Supports expression language: false</li>
+			</ul></li>
+		<li><strong>Communications Timeout</strong>
+			<ul>
+				<li>Specifices how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received
+				<li>Default value: 30 secs</li>
+				<li>Supports expression language: false</li>
+			</ul></li>
+			
+	</ul>
+
+
+	<i>See Also:</i>
+	<ul>
+		<li><a href="../org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer/index.html">Distributed Map Cache Server</a></li>
+		<li><a href="../org.apache.nifi.ssl.StandardSSLContextService/index.html">Standard SSL Context Service</a></li>
+	</ul>
+
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService/index.html
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService/index.html b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService/index.html
new file mode 100755
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml
new file mode 100644
index 0000000..f636261
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml
@@ -0,0 +1,39 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>distributed-cache-services-bundle</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>distributed-cache-protocol</artifactId>
+    <name>Distributed Cache Protocol</name>
+
+    <description>
+        Defines the communications protocol that is used between clients and servers 
+        for the Distributed Cache services
+    </description>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>remote-communications-utils</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java
new file mode 100644
index 0000000..da2acad
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.protocol;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
+import org.apache.nifi.remote.VersionNegotiator;
+
+public class ProtocolHandshake {
+
+    public static final byte[] MAGIC_HEADER = new byte[] { 'N', 'i', 'F', 'i' };
+    
+    public static final int RESOURCE_OK = 20;
+    public static final int DIFFERENT_RESOURCE_VERSION = 21;
+    public static final int ABORT = 255;
+
+    
+    public static void initiateHandshake(final InputStream in, final OutputStream out, final VersionNegotiator versionNegotiator) throws IOException, HandshakeException {
+        final DataInputStream dis = new DataInputStream(in);
+        final DataOutputStream dos = new DataOutputStream(out);
+        
+        try {
+            dos.write(MAGIC_HEADER);
+            
+            initiateVersionNegotiation(versionNegotiator, dis, dos);
+        } finally {
+            dos.flush();
+        }
+    }
+
+    
+    public static void receiveHandshake(final InputStream in, final OutputStream out, final VersionNegotiator versionNegotiator) throws IOException, HandshakeException {
+        final DataInputStream dis = new DataInputStream(in);
+        final DataOutputStream dos = new DataOutputStream(out);
+        
+        try {
+            final byte[] magicHeaderBuffer = new byte[4];
+            dis.readFully(magicHeaderBuffer);
+            
+            receiveVersionNegotiation(versionNegotiator, dis, dos);
+        } finally {
+            dos.flush();
+        }
+    }
+    
+    
+    private static void initiateVersionNegotiation(final VersionNegotiator negotiator, final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException {
+        // Write the classname of the RemoteStreamCodec, followed by its version
+        dos.writeInt(negotiator.getVersion());
+        dos.flush();
+        
+        // wait for response from server.
+        final int statusCode = dis.read();
+        switch (statusCode) {
+            case RESOURCE_OK:   // server accepted our proposal of codec name/version
+                return;
+            case DIFFERENT_RESOURCE_VERSION:    // server accepted our proposal of codec name but not the version
+                // Get server's preferred version
+                final int newVersion = dis.readInt();
+                
+                // Determine our new preferred version that is no greater than the server's preferred version.
+                final Integer newPreference = negotiator.getPreferredVersion(newVersion);
+                // If we could not agree with server on a version, fail now.
+                if ( newPreference == null ) {
+                    throw new HandshakeException("Could not agree on protocol version");
+                }
+                
+                negotiator.setVersion(newPreference);
+                
+                // Attempt negotiation of resource based on our new preferred version.
+                initiateVersionNegotiation(negotiator, dis, dos);
+            case ABORT:
+                throw new HandshakeException("Remote destination aborted connection with message: " + dis.readUTF());
+            default:
+                throw new HandshakeException("Received unexpected response code " + statusCode + " when negotiating version with remote server");
+        }
+    }
+    
+    private static void receiveVersionNegotiation(final VersionNegotiator negotiator, final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException {
+        final int version = dis.readInt();
+        if ( negotiator.isVersionSupported(version) ) {
+            dos.write(RESOURCE_OK);
+            dos.flush();
+            
+            negotiator.setVersion(version);
+        } else {
+            final Integer preferred = negotiator.getPreferredVersion(version);
+            if ( preferred == null ) {
+                dos.write(ABORT);
+                dos.flush();
+                throw new HandshakeException("Unable to negotiate an acceptable version of the Distributed Cache Protocol");
+            }
+            dos.write(DIFFERENT_RESOURCE_VERSION);
+            dos.writeInt(preferred);
+            dos.flush();
+            
+            receiveVersionNegotiation(negotiator, dis, dos);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java
new file mode 100644
index 0000000..8049d42
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.protocol.exception;
+
+public class HandshakeException extends Exception {
+    public HandshakeException(final String message) {
+        super(message);
+    }
+    
+    public HandshakeException(final Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/pom.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/pom.xml b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/pom.xml
new file mode 100644
index 0000000..b57d284
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/pom.xml
@@ -0,0 +1,78 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>distributed-cache-services-bundle</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>distributed-cache-server</artifactId>
+
+    <name>Distributed Cache Server</name>
+    <description>Provides a Controller Service for hosting Distributed Caches</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>distributed-cache-protocol</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>remote-communications-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-processor-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-stream-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>ssl-context-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>wali</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>distributed-cache-client-service-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>distributed-cache-client-service</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>ssl-context-service</artifactId>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
new file mode 100644
index 0000000..9b4e70e
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
+import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
+import org.apache.nifi.io.BufferedInputStream;
+import org.apache.nifi.io.BufferedOutputStream;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.remote.io.socket.SocketChannelInputStream;
+import org.apache.nifi.remote.io.socket.SocketChannelOutputStream;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractCacheServer implements CacheServer {
+
+    private static final Logger logger = LoggerFactory.getLogger(AbstractCacheServer.class);
+
+    private final String identifier;
+    private final int port;
+    private final SSLContext sslContext;
+    protected volatile boolean stopped = false;
+    private final Set<Thread> processInputThreads = new CopyOnWriteArraySet<>();;
+
+    private volatile ServerSocketChannel serverSocketChannel;
+
+    public AbstractCacheServer(final String identifier, final SSLContext sslContext, final int port) {
+        this.identifier = identifier;
+        this.port = port;
+        this.sslContext = sslContext;
+    }
+
+    @Override
+    public void start() throws IOException {
+        serverSocketChannel = ServerSocketChannel.open();
+        serverSocketChannel.configureBlocking(true);
+        serverSocketChannel.bind(new InetSocketAddress(port));
+
+        final Runnable runnable = new Runnable() {
+
+            @Override
+            public void run() {
+                while (true) {
+                    final SocketChannel socketChannel;
+                    try {
+                        socketChannel = serverSocketChannel.accept();
+                        logger.debug("Connected to {}", new Object[] { socketChannel });
+                    } catch (final IOException e) {
+                        if (!stopped) {
+                            logger.error("{} unable to accept connection from remote peer due to {}", this, e.toString());
+                            if (logger.isDebugEnabled()) {
+                                logger.error("", e);
+                            }
+                        }
+                        return;
+                    }
+
+                    final Runnable processInputRunnable = new Runnable() {
+                        @Override
+                        public void run() {
+                            final InputStream rawInputStream;
+                            final OutputStream rawOutputStream;
+                            final String peer = socketChannel.socket().getInetAddress().getHostName();
+
+                            try {
+                                if (sslContext == null) {
+                                    rawInputStream = new SocketChannelInputStream(socketChannel);
+                                    rawOutputStream = new SocketChannelOutputStream(socketChannel);
+                                } else {
+                                    final SSLSocketChannel sslSocketChannel = new SSLSocketChannel(sslContext, socketChannel, false);
+                                    sslSocketChannel.connect();
+                                    rawInputStream = new SSLSocketChannelInputStream(sslSocketChannel);
+                                    rawOutputStream = new SSLSocketChannelOutputStream(sslSocketChannel);
+                                }
+                            } catch (IOException e) {
+                                logger.error("Cannot create input and/or output streams for {}", new Object[] { identifier }, e);
+                                if (logger.isDebugEnabled()) {
+                                    logger.error("", e);
+                                }
+                                try {
+                                    socketChannel.close();
+                                } catch (IOException swallow) {
+                                }
+                               
+                                return;
+                            }
+                            try (final InputStream in = new BufferedInputStream(rawInputStream);
+                                    final OutputStream out = new BufferedOutputStream(rawOutputStream)) {
+
+                                final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
+
+                                ProtocolHandshake.receiveHandshake(in, out, versionNegotiator);
+
+                                boolean continueComms = true;
+                                while (continueComms) {
+                                    continueComms = listen(in, out, versionNegotiator.getVersion());
+                                }
+                                // client has issued 'close'
+                                logger.debug("Client issued close on {}", new Object[] { socketChannel });
+                            } catch (final SocketTimeoutException e) {
+                                logger.debug("30 sec timeout reached", e);
+                            } catch (final IOException | HandshakeException e) {
+                                if (!stopped) {
+                                    logger.error("{} unable to communicate with remote peer {} due to {}", new Object[] { this, peer, e.toString() });
+                                    if (logger.isDebugEnabled()) {
+                                        logger.error("", e);
+                                    }
+                                }
+                            } finally {
+                                processInputThreads.remove(Thread.currentThread());
+                            }
+                        }
+                    };
+
+                    final Thread processInputThread = new Thread(processInputRunnable);
+                    processInputThread.setName("Distributed Cache Server Communications Thread: " + identifier);
+                    processInputThread.setDaemon(true);
+                    processInputThread.start();
+                    processInputThreads.add(processInputThread);
+                }
+            }
+        };
+
+        final Thread thread = new Thread(runnable);
+        thread.setDaemon(true);
+        thread.setName("Distributed Cache Server: " + identifier);
+        thread.start();
+    }
+
+    @Override
+    public void stop() throws IOException {
+        stopped = true;
+        logger.info("Stopping CacheServer {}", new Object[] { this.identifier });
+
+        if (serverSocketChannel != null) {
+            serverSocketChannel.close();
+        }
+        // need to close out the created SocketChannels...this is done by interrupting
+        // the created threads that loop on listen().
+        for (Thread processInputThread : processInputThreads) {
+            processInputThread.interrupt();
+            int i = 0;
+            while (!processInputThread.isInterrupted() && i++ < 5) {
+                try {
+                    Thread.sleep(50); // allow thread to gracefully terminate
+                } catch (InterruptedException e) {
+                }
+            }
+        }
+        processInputThreads.clear();
+    }
+
+    @Override
+    public String toString() {
+        return "CacheServer[id=" + identifier + "]";
+    }
+
+    /**
+     * Listens for incoming data and communicates with remote peer
+     * 
+     * @param in
+     * @param out
+     * @param version
+     * @return <code>true</code> if communications should continue, <code>false</code> otherwise
+     * @throws IOException
+     */
+    protected abstract boolean listen(InputStream in, OutputStream out, int version) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java
new file mode 100644
index 0000000..71ac56d
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class CacheRecord {
+
+    private static final AtomicLong idGenerator = new AtomicLong(0L);
+    
+    private final long id;
+    private final long entryDate;
+    private volatile long lastHitDate;
+    private final AtomicInteger hitCount = new AtomicInteger(0);
+    
+    public CacheRecord() {
+        entryDate = System.currentTimeMillis();
+        lastHitDate = entryDate;
+        id = idGenerator.getAndIncrement();
+    }
+    
+    public long getEntryDate() {
+        return entryDate;
+    }
+    
+    public long getLastHitDate() {
+        return lastHitDate;
+    }
+    
+    public int getHitCount() {
+        return hitCount.get();
+    }
+
+    public void hit() {
+        hitCount.getAndIncrement();
+        lastHitDate = System.currentTimeMillis();
+    }
+    
+    public long getId() {
+        return id;
+    }
+}