You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/15 11:14:54 UTC

[09/12] incubator-nifi git commit: NIFI-169 well it finally all builds. There is a classpath issue still to sort out which impacts startup

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java
deleted file mode 100644
index 8049d42..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.protocol.exception;
-
-public class HandshakeException extends Exception {
-    public HandshakeException(final String message) {
-        super(message);
-    }
-    
-    public HandshakeException(final Throwable cause) {
-        super(cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/pom.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/pom.xml b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/pom.xml
deleted file mode 100644
index 5dec322..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/pom.xml
+++ /dev/null
@@ -1,81 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-      http://www.apache.org/licenses/LICENSE-2.0
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.nifi</groupId>
-		<artifactId>distributed-cache-services-bundle</artifactId>
-		<version>0.0.1-SNAPSHOT</version>
-	</parent>
-
-	<artifactId>distributed-cache-server</artifactId>
-
-	<name>Distributed Cache Server</name>
-	<description>Provides a Controller Service for hosting Distributed Caches</description>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.nifi</groupId>
-			<artifactId>nifi-api</artifactId>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.nifi</groupId>
-			<artifactId>distributed-cache-protocol</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.nifi</groupId>
-			<artifactId>remote-communications-utils</artifactId>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.nifi</groupId>
-			<artifactId>nifi-processor-utils</artifactId>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.nifi</groupId>
-			<artifactId>nifi-stream-utils</artifactId>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.nifi</groupId>
-			<artifactId>ssl-context-service-api</artifactId>
-		</dependency>
-		<dependency>
-			<groupId>wali</groupId>
-			<artifactId>wali</artifactId>
-		</dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>distributed-cache-client-service-api</artifactId>
-            <scope>test</scope>
-        </dependency>
-		<dependency>
-			<groupId>org.apache.nifi</groupId>
-			<artifactId>distributed-cache-client-service</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.nifi</groupId>
-			<artifactId>nifi-mock</artifactId>
-		</dependency>
-
-		<dependency>
-		    <groupId>org.apache.nifi</groupId>
-		    <artifactId>ssl-context-service</artifactId>
-		</dependency>
-	</dependencies>
-
-</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
deleted file mode 100644
index 9b4e70e..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-import javax.net.ssl.SSLContext;
-
-import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
-import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
-import org.apache.nifi.io.BufferedInputStream;
-import org.apache.nifi.io.BufferedOutputStream;
-import org.apache.nifi.remote.StandardVersionNegotiator;
-import org.apache.nifi.remote.VersionNegotiator;
-import org.apache.nifi.remote.io.socket.SocketChannelInputStream;
-import org.apache.nifi.remote.io.socket.SocketChannelOutputStream;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class AbstractCacheServer implements CacheServer {
-
-    private static final Logger logger = LoggerFactory.getLogger(AbstractCacheServer.class);
-
-    private final String identifier;
-    private final int port;
-    private final SSLContext sslContext;
-    protected volatile boolean stopped = false;
-    private final Set<Thread> processInputThreads = new CopyOnWriteArraySet<>();;
-
-    private volatile ServerSocketChannel serverSocketChannel;
-
-    public AbstractCacheServer(final String identifier, final SSLContext sslContext, final int port) {
-        this.identifier = identifier;
-        this.port = port;
-        this.sslContext = sslContext;
-    }
-
-    @Override
-    public void start() throws IOException {
-        serverSocketChannel = ServerSocketChannel.open();
-        serverSocketChannel.configureBlocking(true);
-        serverSocketChannel.bind(new InetSocketAddress(port));
-
-        final Runnable runnable = new Runnable() {
-
-            @Override
-            public void run() {
-                while (true) {
-                    final SocketChannel socketChannel;
-                    try {
-                        socketChannel = serverSocketChannel.accept();
-                        logger.debug("Connected to {}", new Object[] { socketChannel });
-                    } catch (final IOException e) {
-                        if (!stopped) {
-                            logger.error("{} unable to accept connection from remote peer due to {}", this, e.toString());
-                            if (logger.isDebugEnabled()) {
-                                logger.error("", e);
-                            }
-                        }
-                        return;
-                    }
-
-                    final Runnable processInputRunnable = new Runnable() {
-                        @Override
-                        public void run() {
-                            final InputStream rawInputStream;
-                            final OutputStream rawOutputStream;
-                            final String peer = socketChannel.socket().getInetAddress().getHostName();
-
-                            try {
-                                if (sslContext == null) {
-                                    rawInputStream = new SocketChannelInputStream(socketChannel);
-                                    rawOutputStream = new SocketChannelOutputStream(socketChannel);
-                                } else {
-                                    final SSLSocketChannel sslSocketChannel = new SSLSocketChannel(sslContext, socketChannel, false);
-                                    sslSocketChannel.connect();
-                                    rawInputStream = new SSLSocketChannelInputStream(sslSocketChannel);
-                                    rawOutputStream = new SSLSocketChannelOutputStream(sslSocketChannel);
-                                }
-                            } catch (IOException e) {
-                                logger.error("Cannot create input and/or output streams for {}", new Object[] { identifier }, e);
-                                if (logger.isDebugEnabled()) {
-                                    logger.error("", e);
-                                }
-                                try {
-                                    socketChannel.close();
-                                } catch (IOException swallow) {
-                                }
-                               
-                                return;
-                            }
-                            try (final InputStream in = new BufferedInputStream(rawInputStream);
-                                    final OutputStream out = new BufferedOutputStream(rawOutputStream)) {
-
-                                final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
-
-                                ProtocolHandshake.receiveHandshake(in, out, versionNegotiator);
-
-                                boolean continueComms = true;
-                                while (continueComms) {
-                                    continueComms = listen(in, out, versionNegotiator.getVersion());
-                                }
-                                // client has issued 'close'
-                                logger.debug("Client issued close on {}", new Object[] { socketChannel });
-                            } catch (final SocketTimeoutException e) {
-                                logger.debug("30 sec timeout reached", e);
-                            } catch (final IOException | HandshakeException e) {
-                                if (!stopped) {
-                                    logger.error("{} unable to communicate with remote peer {} due to {}", new Object[] { this, peer, e.toString() });
-                                    if (logger.isDebugEnabled()) {
-                                        logger.error("", e);
-                                    }
-                                }
-                            } finally {
-                                processInputThreads.remove(Thread.currentThread());
-                            }
-                        }
-                    };
-
-                    final Thread processInputThread = new Thread(processInputRunnable);
-                    processInputThread.setName("Distributed Cache Server Communications Thread: " + identifier);
-                    processInputThread.setDaemon(true);
-                    processInputThread.start();
-                    processInputThreads.add(processInputThread);
-                }
-            }
-        };
-
-        final Thread thread = new Thread(runnable);
-        thread.setDaemon(true);
-        thread.setName("Distributed Cache Server: " + identifier);
-        thread.start();
-    }
-
-    @Override
-    public void stop() throws IOException {
-        stopped = true;
-        logger.info("Stopping CacheServer {}", new Object[] { this.identifier });
-
-        if (serverSocketChannel != null) {
-            serverSocketChannel.close();
-        }
-        // need to close out the created SocketChannels...this is done by interrupting
-        // the created threads that loop on listen().
-        for (Thread processInputThread : processInputThreads) {
-            processInputThread.interrupt();
-            int i = 0;
-            while (!processInputThread.isInterrupted() && i++ < 5) {
-                try {
-                    Thread.sleep(50); // allow thread to gracefully terminate
-                } catch (InterruptedException e) {
-                }
-            }
-        }
-        processInputThreads.clear();
-    }
-
-    @Override
-    public String toString() {
-        return "CacheServer[id=" + identifier + "]";
-    }
-
-    /**
-     * Listens for incoming data and communicates with remote peer
-     * 
-     * @param in
-     * @param out
-     * @param version
-     * @return <code>true</code> if communications should continue, <code>false</code> otherwise
-     * @throws IOException
-     */
-    protected abstract boolean listen(InputStream in, OutputStream out, int version) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java
deleted file mode 100644
index 71ac56d..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class CacheRecord {
-
-    private static final AtomicLong idGenerator = new AtomicLong(0L);
-    
-    private final long id;
-    private final long entryDate;
-    private volatile long lastHitDate;
-    private final AtomicInteger hitCount = new AtomicInteger(0);
-    
-    public CacheRecord() {
-        entryDate = System.currentTimeMillis();
-        lastHitDate = entryDate;
-        id = idGenerator.getAndIncrement();
-    }
-    
-    public long getEntryDate() {
-        return entryDate;
-    }
-    
-    public long getLastHitDate() {
-        return lastHitDate;
-    }
-    
-    public int getHitCount() {
-        return hitCount.get();
-    }
-
-    public void hit() {
-        hitCount.getAndIncrement();
-        lastHitDate = System.currentTimeMillis();
-    }
-    
-    public long getId() {
-        return id;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java
deleted file mode 100644
index 2c85cd8..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server;
-
-import java.io.IOException;
-
-public interface CacheServer {
-
-    void start() throws IOException;
-    void stop() throws IOException;
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
deleted file mode 100644
index 0f962d0..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.controller.annotation.OnConfigured;
-import org.apache.nifi.processor.annotation.OnShutdown;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.ssl.SSLContextService;
-
-public abstract class DistributedCacheServer extends AbstractControllerService {
-    public static final String EVICTION_STRATEGY_LFU = "Least Frequently Used";
-    public static final String EVICTION_STRATEGY_LRU = "Least Recently Used";
-    public static final String EVICTION_STRATEGY_FIFO = "First In, First Out";
-
-    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
-            .name("Port")
-            .description("The port to listen on for incoming connections")
-            .required(true)
-            .addValidator(StandardValidators.PORT_VALIDATOR)
-            .defaultValue("4557")
-            .build();
-    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
-            .name("SSL Context Service")
-            .description(
-                    "If specified, this service will be used to create an SSL Context that will be used to secure communications; if not specified, communications will not be secure")
-            .required(false)
-            .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class))
-            .build();
-    public static final PropertyDescriptor MAX_CACHE_ENTRIES = new PropertyDescriptor.Builder()
-            .name("Maximum Cache Entries")
-            .description("The maximum number of cache entries that the cache can hold")
-            .required(true)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .defaultValue("10000")
-            .build();
-    public static final PropertyDescriptor EVICTION_POLICY = new PropertyDescriptor.Builder()
-            .name("Eviction Strategy")
-            .description("Determines which strategy should be used to evict values from the cache to make room for new entries")
-            .required(true)
-            .allowableValues(EVICTION_STRATEGY_LFU, EVICTION_STRATEGY_LRU, EVICTION_STRATEGY_FIFO)
-            .defaultValue(EVICTION_STRATEGY_LFU)
-            .build();
-    public static final PropertyDescriptor PERSISTENCE_PATH = new PropertyDescriptor.Builder()
-            .name("Persistence Directory")
-            .description("If specified, the cache will be persisted in the given directory; if not specified, the cache will be in-memory only")
-            .required(false)
-            .addValidator(StandardValidators.createDirectoryExistsValidator(true, true))
-            .build();
-
-    private volatile CacheServer cacheServer;
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(PORT);
-        properties.add(MAX_CACHE_ENTRIES);
-        properties.add(EVICTION_POLICY);
-        properties.add(PERSISTENCE_PATH);
-        properties.add(new PropertyDescriptor.Builder().fromPropertyDescriptor(SSL_CONTEXT_SERVICE).allowableValues(
-                getControllerServiceLookup().getControllerServiceIdentifiers(SSLContextService.class)).build());
-        return properties;
-    }
-
-    @OnConfigured
-    public void startServer(final ConfigurationContext context) throws IOException {
-        if (cacheServer == null) {
-            cacheServer = createCacheServer(context);
-            cacheServer.start();
-        }
-    }
-
-    @OnShutdown
-    public void shutdownServer() throws IOException {
-        if (cacheServer != null) {
-            cacheServer.stop();
-        }
-        cacheServer = null;
-    }
-
-    @Override
-    protected void finalize() throws Throwable {
-        shutdownServer();
-    }
-
-    protected abstract CacheServer createCacheServer(ConfigurationContext context);
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java
deleted file mode 100644
index 426573f..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server;
-
-import java.io.File;
-
-import javax.net.ssl.SSLContext;
-
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.ssl.SSLContextService;
-import org.apache.nifi.ssl.SSLContextService.ClientAuth;
-
-public class DistributedSetCacheServer extends DistributedCacheServer {
-
-    @Override
-    protected CacheServer createCacheServer(final ConfigurationContext context) {
-        final int port = context.getProperty(PORT).asInteger();
-        final String persistencePath = context.getProperty(PERSISTENCE_PATH).getValue();
-        final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-        final int maxSize = context.getProperty(MAX_CACHE_ENTRIES).asInteger();
-        final String evictionPolicyName = context.getProperty(EVICTION_POLICY).getValue();
-        
-        final SSLContext sslContext;
-        if ( sslContextService == null ) {
-            sslContext = null;
-        } else {
-            sslContext = sslContextService.createSSLContext(ClientAuth.REQUIRED);
-        }
-        
-        final EvictionPolicy evictionPolicy;
-        switch (evictionPolicyName) {
-            case EVICTION_STRATEGY_FIFO:
-                evictionPolicy = EvictionPolicy.FIFO;
-                break;
-            case EVICTION_STRATEGY_LFU:
-                evictionPolicy = EvictionPolicy.LFU;
-                break;
-            case EVICTION_STRATEGY_LRU:
-                evictionPolicy = EvictionPolicy.LRU;
-                break;
-            default:
-                throw new IllegalArgumentException("Illegal Eviction Policy: " + evictionPolicyName);
-        }
-        
-        try {
-            final File persistenceDir = persistencePath == null ? null : new File(persistencePath);
-            
-            return new SetCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir);
-        } catch (final Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java
deleted file mode 100644
index 60bd2c1..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server;
-
-import java.util.Comparator;
-
-public enum EvictionPolicy {
-    LFU(new LFUComparator()),
-    LRU(new LRUComparator()),
-    FIFO(new FIFOComparator());
-    
-    private final Comparator<CacheRecord> comparator;
-    
-    private EvictionPolicy(final Comparator<CacheRecord> comparator) {
-        this.comparator = comparator;
-    }
-    
-    public Comparator<CacheRecord> getComparator() {
-        return comparator;
-    }
-    
-    public static class LFUComparator implements Comparator<CacheRecord> {
-        @Override
-        public int compare(final CacheRecord o1, final CacheRecord o2) {
-            if ( o1.equals(o2) ) {
-                return 0;
-            }
-            
-            final int hitCountComparison = Integer.compare(o1.getHitCount(), o2.getHitCount());
-            final int entryDateComparison = (hitCountComparison == 0) ? Long.compare(o1.getEntryDate(), o2.getEntryDate()) : hitCountComparison;
-            return (entryDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : entryDateComparison);
-        }
-    }
-    
-    public static class LRUComparator implements Comparator<CacheRecord> {
-        @Override
-        public int compare(final CacheRecord o1, final CacheRecord o2) {
-            if ( o1.equals(o2) ) {
-                return 0;
-            }
-
-            final int lastHitDateComparison = Long.compare(o1.getLastHitDate(), o2.getLastHitDate());
-            return (lastHitDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : lastHitDateComparison);
-        }
-    }
-    
-    public static class FIFOComparator implements Comparator<CacheRecord> {
-        @Override
-        public int compare(final CacheRecord o1, final CacheRecord o2) {
-            if ( o1.equals(o2) ) {
-                return 0;
-            }
-
-            final int entryDateComparison = Long.compare(o1.getEntryDate(), o2.getEntryDate());
-            return (entryDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : entryDateComparison);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
deleted file mode 100644
index 5d2c0f6..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server;
-
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-import javax.net.ssl.SSLContext;
-
-import org.apache.nifi.distributed.cache.server.set.PersistentSetCache;
-import org.apache.nifi.distributed.cache.server.set.SetCache;
-import org.apache.nifi.distributed.cache.server.set.SetCacheResult;
-import org.apache.nifi.distributed.cache.server.set.SimpleSetCache;
-import org.apache.nifi.io.DataOutputStream;
-
-public class SetCacheServer extends AbstractCacheServer {
-
-    private final SetCache cache;
-
-    public SetCacheServer(final String identifier, final SSLContext sslContext, final int port, final int maxSize,
-            final EvictionPolicy evictionPolicy, final File persistencePath) throws IOException {
-        super(identifier, sslContext, port);
-
-        final SetCache simpleCache = new SimpleSetCache(identifier, maxSize, evictionPolicy);
-
-        if (persistencePath == null) {
-            this.cache = simpleCache;
-        } else {
-            final PersistentSetCache persistentCache = new PersistentSetCache(identifier, persistencePath, simpleCache);
-            persistentCache.restore();
-            this.cache = persistentCache;
-        }
-    }
-
-    @Override
-    protected boolean listen(final InputStream in, final OutputStream out, final int version) throws IOException {
-        final DataInputStream dis = new DataInputStream(in);
-        final DataOutputStream dos = new DataOutputStream(out);
-
-        final String action = dis.readUTF();
-        if (action.equals("close")) {
-            return false;
-        }
-
-        final int valueLength = dis.readInt();
-        final byte[] value = new byte[valueLength];
-        dis.readFully(value);
-        final ByteBuffer valueBuffer = ByteBuffer.wrap(value);
-
-        final SetCacheResult response;
-        switch (action) {
-        case "addIfAbsent":
-            response = cache.addIfAbsent(valueBuffer);
-            break;
-        case "contains":
-            response = cache.contains(valueBuffer);
-            break;
-        case "remove":
-            response = cache.remove(valueBuffer);
-            break;
-        default:
-            throw new IOException("IllegalRequest");
-        }
-
-        dos.writeBoolean(response.getResult());
-        dos.flush();
-
-        return true;
-    }
-
-    @Override
-    public void stop() throws IOException {
-        try {
-            super.stop();
-        } finally {
-            cache.shutdown();
-        }
-    }
-
-    @Override
-    protected void finalize() throws Throwable {
-        if (!stopped)
-            stop();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
deleted file mode 100644
index 920529d..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server.map;
-
-import java.io.File;
-
-import javax.net.ssl.SSLContext;
-
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.distributed.cache.server.CacheServer;
-import org.apache.nifi.distributed.cache.server.DistributedCacheServer;
-import org.apache.nifi.distributed.cache.server.EvictionPolicy;
-import org.apache.nifi.ssl.SSLContextService;
-import org.apache.nifi.ssl.SSLContextService.ClientAuth;
-
-public class DistributedMapCacheServer extends DistributedCacheServer {
-
-    @Override
-    protected CacheServer createCacheServer(final ConfigurationContext context) {
-        final int port = context.getProperty(PORT).asInteger();
-        final String persistencePath = context.getProperty(PERSISTENCE_PATH).getValue();
-        final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-        final int maxSize = context.getProperty(MAX_CACHE_ENTRIES).asInteger();
-        final String evictionPolicyName = context.getProperty(EVICTION_POLICY).getValue();
-        
-        final SSLContext sslContext;
-        if ( sslContextService == null ) {
-            sslContext = null;
-        } else {
-            sslContext = sslContextService.createSSLContext(ClientAuth.REQUIRED);
-        }
-        
-        final EvictionPolicy evictionPolicy;
-        switch (evictionPolicyName) {
-            case EVICTION_STRATEGY_FIFO:
-                evictionPolicy = EvictionPolicy.FIFO;
-                break;
-            case EVICTION_STRATEGY_LFU:
-                evictionPolicy = EvictionPolicy.LFU;
-                break;
-            case EVICTION_STRATEGY_LRU:
-                evictionPolicy = EvictionPolicy.LRU;
-                break;
-            default:
-                throw new IllegalArgumentException("Illegal Eviction Policy: " + evictionPolicyName);
-        }
-        
-        try {
-            final File persistenceDir = persistencePath == null ? null : new File(persistencePath);
-            
-            return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir);
-        } catch (final Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
deleted file mode 100644
index 534cb0b..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server.map;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-public interface MapCache {
-
-    MapPutResult putIfAbsent(ByteBuffer key, ByteBuffer value) throws IOException;
-    boolean containsKey(ByteBuffer key) throws IOException;
-    ByteBuffer get(ByteBuffer key) throws IOException;
-    ByteBuffer remove(ByteBuffer key) throws IOException;
-    void shutdown() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java
deleted file mode 100644
index b0ab0c4..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server.map;
-
-import java.nio.ByteBuffer;
-
-import org.apache.nifi.distributed.cache.server.CacheRecord;
-
-public class MapCacheRecord extends CacheRecord {
-    private final ByteBuffer key;
-    private final ByteBuffer value;
-    
-    public MapCacheRecord(final ByteBuffer key, final ByteBuffer value) {
-        this.key = key;
-        this.value = value;
-    }
-    
-    public ByteBuffer getKey() {
-        return key;
-    }
-    
-    public ByteBuffer getValue() {
-        return value;
-    }
-    
-    @Override
-    public int hashCode() {
-        return 2938476 + key.hashCode() * value.hashCode();
-    }
-    
-    @Override
-    public boolean equals(final Object obj) {
-        if ( obj == this ) {
-            return true;
-        }
-        
-        if ( obj instanceof MapCacheRecord ) {
-            final MapCacheRecord that = ((MapCacheRecord) obj);
-            return key.equals(that.key) && value.equals(that.value);
-        }
-        
-        return false;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
deleted file mode 100644
index 3e8dd0e..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server.map;
-
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-import javax.net.ssl.SSLContext;
-
-import org.apache.nifi.distributed.cache.server.AbstractCacheServer;
-import org.apache.nifi.distributed.cache.server.EvictionPolicy;
-import org.apache.nifi.io.DataOutputStream;
-
-public class MapCacheServer extends AbstractCacheServer {
-
-    private final MapCache cache;
-
-    public MapCacheServer(final String identifier, final SSLContext sslContext, final int port, final int maxSize,
-            final EvictionPolicy evictionPolicy, final File persistencePath) throws IOException {
-        super(identifier, sslContext, port);
-
-        final MapCache simpleCache = new SimpleMapCache(identifier, maxSize, evictionPolicy);
-
-        if (persistencePath == null) {
-            this.cache = simpleCache;
-        } else {
-            final PersistentMapCache persistentCache = new PersistentMapCache(identifier, persistencePath, simpleCache);
-            persistentCache.restore();
-            this.cache = persistentCache;
-        }
-    }
-
-    @Override
-    protected boolean listen(final InputStream in, final OutputStream out, final int version) throws IOException {
-        final DataInputStream dis = new DataInputStream(in);
-        final DataOutputStream dos = new DataOutputStream(out);
-        final String action = dis.readUTF();
-        try {
-            switch (action) {
-            case "close": {
-                return false;
-            }
-            case "putIfAbsent": {
-                final byte[] key = readValue(dis);
-                final byte[] value = readValue(dis);
-                final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
-                dos.writeBoolean(putResult.isSuccessful());
-                break;
-            }
-            case "containsKey": {
-                final byte[] key = readValue(dis);
-                final boolean contains = cache.containsKey(ByteBuffer.wrap(key));
-                dos.writeBoolean(contains);
-                break;
-            }
-            case "getAndPutIfAbsent": {
-                final byte[] key = readValue(dis);
-                final byte[] value = readValue(dis);
-
-                final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
-                if (putResult.isSuccessful()) {
-                    // Put was successful. There was no old value to get.
-                    dos.writeInt(0);
-                } else {
-                    // we didn't put. Write back the previous value
-                    final byte[] byteArray = putResult.getExistingValue().array();
-                    dos.writeInt(byteArray.length);
-                    dos.write(byteArray);
-                }
-
-                break;
-            }
-            case "get": {
-                final byte[] key = readValue(dis);
-                final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key));
-                if (existingValue == null) {
-                    // there was no existing value; we did a "put".
-                    dos.writeInt(0);
-                } else {
-                    // a value already existed. we did not update the map
-                    final byte[] byteArray = existingValue.array();
-                    dos.writeInt(byteArray.length);
-                    dos.write(byteArray);
-                }
-
-                break;
-            }
-            case "remove": {
-                final byte[] key = readValue(dis);
-                final boolean removed = cache.remove(ByteBuffer.wrap(key)) != null;
-                dos.writeBoolean(removed);
-                break;
-            }
-            default: {
-                throw new IOException("Illegal Request");
-            }
-            }
-        } finally {
-            dos.flush();
-        }
-
-        return true;
-    }
-
-    @Override
-    public void stop() throws IOException {
-        try {
-            super.stop();
-        } finally {
-            cache.shutdown();
-        }
-    }
-
-    @Override
-    protected void finalize() throws Throwable {
-        if (!stopped)
-            stop();
-    }
-
-    private byte[] readValue(final DataInputStream dis) throws IOException {
-        final int numBytes = dis.readInt();
-        final byte[] buffer = new byte[numBytes];
-        dis.readFully(buffer);
-        return buffer;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java
deleted file mode 100644
index 29695eb..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server.map;
-
-import java.nio.ByteBuffer;
-
-public class MapPutResult {
-    private final boolean successful;
-    private final ByteBuffer key, value;
-    private final ByteBuffer existingValue;
-    private final ByteBuffer evictedKey, evictedValue;
-    
-    public MapPutResult(final boolean successful, final ByteBuffer key, final ByteBuffer value, final ByteBuffer existingValue, final ByteBuffer evictedKey, final ByteBuffer evictedValue) {
-        this.successful = successful;
-        this.key = key;
-        this.value = value;
-        this.existingValue = existingValue;
-        this.evictedKey = evictedKey;
-        this.evictedValue = evictedValue;
-    }
-
-    public boolean isSuccessful() {
-        return successful;
-    }
-
-    public ByteBuffer getKey() {
-        return key;
-    }
-
-    public ByteBuffer getValue() {
-        return value;
-    }
-    
-    public ByteBuffer getExistingValue() {
-        return existingValue;
-    }
-
-    public ByteBuffer getEvictedKey() {
-        return evictedKey;
-    }
-
-    public ByteBuffer getEvictedValue() {
-        return evictedValue;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
deleted file mode 100644
index 77fb77d..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server.map;
-
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.wali.MinimalLockingWriteAheadLog;
-import org.wali.SerDe;
-import org.wali.UpdateType;
-import org.wali.WriteAheadRepository;
-
-public class PersistentMapCache implements MapCache {
-
-    private final MapCache wrapped;
-    private final WriteAheadRepository<MapWaliRecord> wali;
-    
-    private final AtomicLong modifications = new AtomicLong(0L);
-    
-    public PersistentMapCache(final String serviceIdentifier, final File persistencePath, final MapCache cacheToWrap) throws IOException {
-        wali = new MinimalLockingWriteAheadLog<>(persistencePath.toPath(), 1, new Serde(), null);
-        wrapped = cacheToWrap;
-    }
-
-    synchronized void restore() throws IOException {
-        final Collection<MapWaliRecord> recovered = wali.recoverRecords();
-        for ( final MapWaliRecord record : recovered ) {
-            if ( record.getUpdateType() == UpdateType.CREATE ) {
-                wrapped.putIfAbsent(record.getKey(), record.getValue());
-            }
-        }
-    }
-
-    @Override
-    public MapPutResult putIfAbsent(final ByteBuffer key, final ByteBuffer value) throws IOException {
-        final MapPutResult putResult = wrapped.putIfAbsent(key, value);
-        if ( putResult.isSuccessful() ) {
-            // The put was successful.
-            final MapWaliRecord record = new MapWaliRecord(UpdateType.CREATE, key, value);
-            final List<MapWaliRecord> records = new ArrayList<>();
-            records.add(record);
-
-            if ( putResult.getEvictedKey() != null ) {
-                records.add(new MapWaliRecord(UpdateType.DELETE, putResult.getEvictedKey(), putResult.getEvictedValue()));
-            }
-            
-            wali.update(Collections.singletonList(record), false);
-            
-            final long modCount = modifications.getAndIncrement();
-            if ( modCount > 0 && modCount % 100000 == 0 ) {
-                wali.checkpoint();
-            }
-        }
-        
-        return putResult;
-    }
-
-    @Override
-    public boolean containsKey(final ByteBuffer key) throws IOException {
-        return wrapped.containsKey(key);
-    }
-
-    @Override
-    public ByteBuffer get(final ByteBuffer key) throws IOException {
-        return wrapped.get(key);
-    }
-
-    @Override
-    public ByteBuffer remove(ByteBuffer key) throws IOException {
-        final ByteBuffer removeResult = wrapped.remove(key);
-        if ( removeResult != null ) {
-            final MapWaliRecord record = new MapWaliRecord(UpdateType.DELETE, key, removeResult);
-            final List<MapWaliRecord> records = new ArrayList<>(1);
-            records.add(record);
-            wali.update(records, false);
-            
-            final long modCount = modifications.getAndIncrement();
-            if ( modCount > 0 && modCount % 1000 == 0 ) {
-                wali.checkpoint();
-            }
-        }
-        return removeResult;
-    }
-
-
-    @Override
-    public void shutdown() throws IOException {
-        wali.shutdown();
-    }
-
-
-    private static class MapWaliRecord {
-        private final UpdateType updateType;
-        private final ByteBuffer key;
-        private final ByteBuffer value;
-        
-        public MapWaliRecord(final UpdateType updateType, final ByteBuffer key, final ByteBuffer value) {
-            this.updateType = updateType;
-            this.key = key;
-            this.value = value;
-        }
-        
-        public UpdateType getUpdateType() {
-            return updateType;
-        }
-        
-        public ByteBuffer getKey() {
-            return key;
-        }
-        
-        public ByteBuffer getValue() {
-            return value;
-        }
-    }
-    
-    private static class Serde implements SerDe<MapWaliRecord> {
-
-        @Override
-        public void serializeEdit(MapWaliRecord previousRecordState, MapWaliRecord newRecordState, java.io.DataOutputStream out) throws IOException {
-            final UpdateType updateType = newRecordState.getUpdateType();
-            if ( updateType == UpdateType.DELETE ) {
-                out.write(0);
-            } else {
-                out.write(1);
-            }
-            
-            final byte[] key = newRecordState.getKey().array();
-            final byte[] value = newRecordState.getValue().array();
-            
-            out.writeInt(key.length);
-            out.write(key);
-            out.writeInt(value.length);
-            out.write(value);
-        }
-
-        @Override
-        public void serializeRecord(MapWaliRecord record, java.io.DataOutputStream out) throws IOException {
-            serializeEdit(null, record, out);
-        }
-
-        @Override
-        public MapWaliRecord deserializeEdit(final DataInputStream in, final Map<Object, MapWaliRecord> currentRecordStates, final int version) throws IOException {
-            final int updateTypeValue = in.read();
-            if ( updateTypeValue < 0 ) {
-                throw new EOFException();
-            }
-
-            final UpdateType updateType = (updateTypeValue == 0 ? UpdateType.DELETE : UpdateType.CREATE);
-            
-            final int keySize = in.readInt();
-            final byte[] key = new byte[keySize];
-            in.readFully(key);
-
-            final int valueSize = in.readInt();
-            final byte[] value = new byte[valueSize];
-            in.readFully(value);
-
-            return new MapWaliRecord(updateType, ByteBuffer.wrap(key), ByteBuffer.wrap(value));
-        }
-
-        @Override
-        public MapWaliRecord deserializeRecord(DataInputStream in, int version) throws IOException {
-            return deserializeEdit(in, new HashMap<Object, MapWaliRecord>(), version);
-        }
-
-        @Override
-        public Object getRecordIdentifier(final MapWaliRecord record) {
-            return record.getKey();
-        }
-
-        @Override
-        public UpdateType getUpdateType(final MapWaliRecord record) {
-            return record.getUpdateType();
-        }
-
-        @Override
-        public String getLocation(final MapWaliRecord record) {
-            return null;
-        }
-
-        @Override
-        public int getVersion() {
-            return 1;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
deleted file mode 100644
index 10139f1..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server.map;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.nifi.distributed.cache.server.EvictionPolicy;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SimpleMapCache implements MapCache {
-    private static final Logger logger = LoggerFactory.getLogger(SimpleMapCache.class);
-
-    private final Map<ByteBuffer, MapCacheRecord> cache = new HashMap<>();
-    private final SortedMap<MapCacheRecord, ByteBuffer> inverseCacheMap;
-    
-    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
-    private final Lock readLock = rwLock.readLock();
-    private final Lock writeLock = rwLock.writeLock();
-    
-    private final String serviceIdentifier;
-    
-    private final int maxSize;
-    
-    public SimpleMapCache(final String serviceIdentifier, final int maxSize, final EvictionPolicy evictionPolicy) {
-        // need to change to ConcurrentMap as this is modified when only the readLock is held
-        inverseCacheMap = new ConcurrentSkipListMap<>(evictionPolicy.getComparator());
-        this.serviceIdentifier = serviceIdentifier;
-        this.maxSize = maxSize;
-    }
-    
-    @Override
-    public String toString() {
-        return "SimpleSetCache[service id=" + serviceIdentifier + "]";
-    }
-
-    // don't need synchronized because this method is only called when the writeLock is held, and all 
-    // public methods obtain either the read or write lock
-    private MapCacheRecord evict() {
-        if ( cache.size() < maxSize ) {
-            return null;
-        }
-        
-        final MapCacheRecord recordToEvict = inverseCacheMap.firstKey();
-        final ByteBuffer valueToEvict = inverseCacheMap.remove(recordToEvict);
-        cache.remove(valueToEvict);
-        
-        if ( logger.isDebugEnabled() ) {
-            logger.debug("Evicting value {} from cache", new String(valueToEvict.array(), StandardCharsets.UTF_8));
-        }
-        
-        return recordToEvict;
-    }
-
-    @Override
-    public MapPutResult putIfAbsent(final ByteBuffer key, final ByteBuffer value) {
-        writeLock.lock();
-        try {
-            final MapCacheRecord record = cache.get(key);
-            if ( record == null ) {
-                // Record is null. We will add.
-                final MapCacheRecord evicted = evict();
-                final MapCacheRecord newRecord = new MapCacheRecord(key, value);
-                cache.put(key, newRecord);
-                inverseCacheMap.put(newRecord, key);
-                
-                if ( evicted == null ) {
-                    return new MapPutResult(true, key, value, null, null, null);
-                } else {
-                    return new MapPutResult(true, key, value, null, evicted.getKey(), evicted.getValue());
-                }
-            }
-            
-            // Record is not null. Increment hit count and return result indicating that record was not added.
-            inverseCacheMap.remove(record);
-            record.hit();
-            inverseCacheMap.put(record, key);
-            
-            return new MapPutResult(false, key, value, record.getValue(), null, null);
-        } finally {
-            writeLock.unlock();
-        }
-    }
-    
-    @Override
-    public boolean containsKey(final ByteBuffer key) {
-        readLock.lock();
-        try {
-            final MapCacheRecord record = cache.get(key);
-            if ( record == null ) {
-                return false;
-            }
-            
-            inverseCacheMap.remove(record);
-            record.hit();
-            inverseCacheMap.put(record, key);
-            
-            return true;
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    @Override
-    public ByteBuffer get(final ByteBuffer key) {
-        readLock.lock();
-        try {
-            final MapCacheRecord record = cache.get(key);
-            if ( record == null ) {
-                return null;
-            }
-            
-            inverseCacheMap.remove(record);
-            record.hit();
-            inverseCacheMap.put(record, key);
-            
-            return record.getValue();
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    @Override
-    public ByteBuffer remove(ByteBuffer key) throws IOException {
-        writeLock.lock();
-        try {
-            final MapCacheRecord record = cache.remove(key);
-            if (record == null) {
-                return null;
-            }
-            inverseCacheMap.remove(record);
-            return record.getValue();
-        } finally {
-            writeLock.unlock();
-        }
-    }
-
-    @Override
-    public void shutdown() throws IOException {
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java
deleted file mode 100644
index 4d75fc0..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server.set;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.wali.MinimalLockingWriteAheadLog;
-import org.wali.SerDe;
-import org.wali.UpdateType;
-import org.wali.WriteAheadRepository;
-
-public class PersistentSetCache implements SetCache {
-
-    private final SetCache wrapped;
-    private final WriteAheadRepository<SetRecord> wali;
-    
-    private final AtomicLong modifications = new AtomicLong(0L);
-    
-    public PersistentSetCache(final String serviceIdentifier, final File persistencePath, final SetCache cacheToWrap) throws IOException {
-        wali = new MinimalLockingWriteAheadLog<>(persistencePath.toPath(), 1, new Serde(), null);
-        wrapped = cacheToWrap;
-    }
-    
-    public synchronized void restore() throws IOException {
-        final Collection<SetRecord> recovered = wali.recoverRecords();
-        for ( final SetRecord record : recovered ) {
-            if ( record.getUpdateType() == UpdateType.CREATE ) {
-                addIfAbsent(record.getBuffer());
-            }
-        }
-    }
-    
-    @Override
-    public synchronized SetCacheResult remove(final ByteBuffer value) throws IOException {
-        final SetCacheResult removeResult = wrapped.remove(value);
-        if ( removeResult.getResult() ) {
-            final SetRecord record = new SetRecord(UpdateType.DELETE, value);
-            final List<SetRecord> records = new ArrayList<>();
-            records.add(record);
-            wali.update(records, false);
-            
-            final long modCount = modifications.getAndIncrement();
-            if ( modCount > 0 && modCount % 1000 == 0 ) {
-                wali.checkpoint();
-            }
-        }
-
-        return removeResult;
-    }
-
-    @Override
-    public synchronized SetCacheResult addIfAbsent(final ByteBuffer value) throws IOException {
-        final SetCacheResult addResult = wrapped.addIfAbsent(value);
-        if ( addResult.getResult() ) {
-            final SetRecord record = new SetRecord(UpdateType.CREATE, value);
-            final List<SetRecord> records = new ArrayList<>();
-            records.add(record);
-            
-            final SetCacheRecord evictedRecord = addResult.getEvictedRecord();
-            if ( evictedRecord != null ) {
-                records.add(new SetRecord(UpdateType.DELETE, evictedRecord.getValue()));
-            }
-            
-            wali.update(records, false);
-            
-            final long modCount = modifications.getAndIncrement();
-            if ( modCount > 0 && modCount % 1000 == 0 ) {
-                wali.checkpoint();
-            }
-        }
-        
-        return addResult;
-    }
-
-    @Override
-    public synchronized SetCacheResult contains(final ByteBuffer value) throws IOException {
-        return wrapped.contains(value);
-    }
-    
-    @Override
-    public void shutdown() throws IOException {
-        wali.shutdown();
-    }
-    
-    private static class SetRecord {
-        private final UpdateType updateType;
-        private final ByteBuffer value;
-        
-        public SetRecord(final UpdateType updateType, final ByteBuffer value) {
-            this.updateType = updateType;
-            this.value = value;
-        }
-        
-        public UpdateType getUpdateType() {
-            return updateType;
-        }
-        
-        public ByteBuffer getBuffer() {
-            return value;
-        }
-        
-        public byte[] getData() {
-            return value.array();
-        }
-    }
-    
-    private static class Serde implements SerDe<SetRecord> {
-
-        @Override
-        public void serializeEdit(final SetRecord previousRecordState, final SetRecord newRecordState, final DataOutputStream out) throws IOException {
-            final UpdateType updateType = newRecordState.getUpdateType();
-            if ( updateType == UpdateType.DELETE ) {
-                out.write(0);
-            } else {
-                out.write(1);
-            }
-            
-            final byte[] data = newRecordState.getData();
-            out.writeInt(data.length);
-            out.write(newRecordState.getData());
-        }
-
-        @Override
-        public void serializeRecord(SetRecord record, DataOutputStream out) throws IOException {
-            serializeEdit(null, record, out);
-        }
-
-        @Override
-        public SetRecord deserializeEdit(final DataInputStream in, final Map<Object, SetRecord> currentRecordStates, final int version) throws IOException {
-            final int value = in.read();
-            if ( value < 0 ) {
-                throw new EOFException();
-            }
-
-            final UpdateType updateType = (value == 0 ? UpdateType.DELETE : UpdateType.CREATE);
-            
-            final int size = in.readInt();
-            final byte[] data = new byte[size];
-            in.readFully(data);
-            
-            return new SetRecord(updateType, ByteBuffer.wrap(data));
-        }
-
-        @Override
-        public SetRecord deserializeRecord(DataInputStream in, int version) throws IOException {
-            return deserializeEdit(in, new HashMap<Object, SetRecord>(), version);
-        }
-
-        @Override
-        public Object getRecordIdentifier(final SetRecord record) {
-            return record.getBuffer();
-        }
-
-        @Override
-        public UpdateType getUpdateType(final SetRecord record) {
-            return record.getUpdateType();
-        }
-
-        @Override
-        public String getLocation(final SetRecord record) {
-            return null;
-        }
-
-        @Override
-        public int getVersion() {
-            return 1;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java
deleted file mode 100644
index bf6ae3e..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server.set;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-public interface SetCache {
-
-    SetCacheResult remove(ByteBuffer value) throws IOException;
-    SetCacheResult addIfAbsent(ByteBuffer value) throws IOException;
-    SetCacheResult contains(ByteBuffer value) throws IOException;
-    void shutdown() throws IOException;
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java
deleted file mode 100644
index 20b6fae..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server.set;
-
-import java.nio.ByteBuffer;
-
-import org.apache.nifi.distributed.cache.server.CacheRecord;
-
-public class SetCacheRecord extends CacheRecord {
-    private final ByteBuffer value;
-    
-    public SetCacheRecord(final ByteBuffer value) {
-        this.value = value;
-    }
-    
-    public ByteBuffer getValue() {
-        return value;
-    }
-    
-    @Override
-    public int hashCode() {
-        return value.hashCode();
-    }
-    
-    @Override
-    public boolean equals(final Object obj) {
-        if ( this == obj ) {
-            return true;
-        }
-        
-        if (obj instanceof SetCacheRecord) {
-            return value.equals(((SetCacheRecord) obj).value);
-        }
-        return false;
-    }
-    
-    @Override
-    public String toString() {
-        return "SetCacheRecord[value=" + new String(value.array()) + ", hitCount=" + getHitCount() + "]";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java
deleted file mode 100644
index 732c4f0..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server.set;
-
-
-
-public class SetCacheResult {
-    private final boolean result;
-    private final SetCacheRecord stats;
-    private final SetCacheRecord evictedRecord;
-    
-    public SetCacheResult(final boolean result, final SetCacheRecord stats, final SetCacheRecord evictedRecord) {
-        this.result = result;
-        this.stats = stats;
-        this.evictedRecord = evictedRecord;
-    }
-    
-    public boolean getResult() {
-        return result;
-    }
-    
-    public SetCacheRecord getRecord() {
-        return stats;
-    }
-    
-    public SetCacheRecord getEvictedRecord() {
-        return evictedRecord;
-    }
-}