You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by gr...@apache.org on 2022/05/18 16:42:14 UTC

[nifi] branch main updated: NIFI-9805 Refactored Distributed Cache Servers using Netty

This is an automated email from the ASF dual-hosted git repository.

greyp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new fe424a2d42 NIFI-9805 Refactored Distributed Cache Servers using Netty
fe424a2d42 is described below

commit fe424a2d420945847172fbcb3aedd070e5b7b953
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Thu May 12 00:05:11 2022 -0500

    NIFI-9805 Refactored Distributed Cache Servers using Netty
    
    - Added Map and Set Cache Servers based on nifi-event-transport components
    - Removed custom servers and unused socket stream components
    - Reduced duplication on protocol classes
    - Added checks for readable bytes
    - Added mark and reset handling for buffer reads
    
    This closes #6040
    Signed-off-by: Paul Grey <gr...@apache.org>
---
 .../remote/io/socket/SocketChannelInputStream.java | 188 ---------------
 .../io/socket/SocketChannelOutputStream.java       | 124 ----------
 .../remote/io/socket/TestSocketChannelStreams.java | 231 -------------------
 .../{SetOperation.java => CacheOperation.java}     |  24 +-
 .../distributed/cache/operations/MapOperation.java |   3 +-
 .../distributed/cache/operations/SetOperation.java |   3 +-
 ...tOperation.java => StandardCacheOperation.java} |  10 +-
 .../nifi-distributed-cache-server/pom.xml          |   5 +
 .../cache/server/AbstractCacheServer.java          | 247 --------------------
 .../cache/server/DistributedSetCacheServer.java    |  10 +-
 .../distributed/cache/server/EventCacheServer.java | 113 +++++++++
 .../distributed/cache/server/SetCacheServer.java   | 102 ---------
 .../server/codec/CacheOperationResultEncoder.java} |  30 ++-
 .../cache/server/codec/CacheRequestDecoder.java    | 243 ++++++++++++++++++++
 .../server/codec/CacheVersionRequestHandler.java   |  81 +++++++
 .../server/codec/CacheVersionResponseEncoder.java  |  41 ++++
 .../cache/server/codec/MapCacheRequestDecoder.java | 150 ++++++++++++
 .../cache/server/codec/MapCacheRequestHandler.java | 178 +++++++++++++++
 .../server/codec/MapRemoveResponseEncoder.java}    |  29 ++-
 .../server/codec/MapSizeResponseEncoder.java}      |  29 ++-
 .../server/codec/MapValueResponseEncoder.java      |  43 ++++
 .../cache/server/codec/SetCacheRequestHandler.java |  80 +++++++
 .../server/map/DistributedMapCacheServer.java      |  23 +-
 .../cache/server/map/MapCacheServer.java           | 252 ---------------------
 .../cache/server/map/StandardMapCacheServer.java   | 124 ++++++++++
 .../server/protocol/CacheOperationResult.java}     |  24 +-
 .../cache/server/protocol/CacheRequest.java}       |  33 ++-
 .../server/protocol/CacheVersionRequest.java}      |  23 +-
 .../server/protocol/CacheVersionResponse.java}     |  29 ++-
 .../cache/server/protocol/MapCacheRequest.java     | 113 +++++++++
 .../cache/server/protocol/MapRemoveResponse.java}  |  23 +-
 .../cache/server/protocol/MapSizeResponse.java}    |  23 +-
 .../cache/server/protocol/MapValueResponse.java    |  49 ++--
 .../cache/server/set/StandardSetCacheServer.java   | 107 +++++++++
 .../cache/server/map/DistributedMapCacheTest.java  |  33 ++-
 .../server/map/DistributedMapCacheTlsTest.java     |  28 +--
 .../server/map/StandardMapCacheServerTest.java     | 175 ++++++++++++++
 .../map/TestDistributedMapServerAndClient.java     |  10 +-
 .../cache/server/set/DistributedSetCacheTest.java  |  25 +-
 39 files changed, 1673 insertions(+), 1385 deletions(-)

diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
deleted file mode 100644
index 21f1683cfe..0000000000
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket;
-
-import org.apache.nifi.remote.exception.TransmissionDisabledException;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.util.Set;
-
-public class SocketChannelInputStream extends InputStream {
-
-    private final SocketChannel channel;
-    private volatile int timeoutMillis = 30000;
-    private volatile boolean interrupted = false;
-    private final Selector readSelector;
-
-    private final ByteBuffer oneByteBuffer = ByteBuffer.allocate(1);
-    private Byte bufferedByte = null;
-
-    public SocketChannelInputStream(final SocketChannel socketChannel) throws IOException {
-        // this class expects a non-blocking channel
-        socketChannel.configureBlocking(false);
-        this.channel = socketChannel;
-
-        readSelector = Selector.open();
-        this.channel.register(readSelector, SelectionKey.OP_READ);
-    }
-
-    public void setTimeout(final int timeoutMillis) {
-        this.timeoutMillis = timeoutMillis;
-    }
-
-    public void consume() throws IOException {
-        channel.shutdownInput();
-
-        final byte[] b = new byte[4096];
-        final ByteBuffer buffer = ByteBuffer.wrap(b);
-        int bytesRead;
-        do {
-            bytesRead = channel.read(buffer);
-            buffer.flip();
-        } while (bytesRead > 0);
-    }
-
-    @Override
-    public int read() throws IOException {
-        if (bufferedByte != null) {
-            final int retVal = bufferedByte & 0xFF;
-            bufferedByte = null;
-            return retVal;
-        }
-
-        oneByteBuffer.flip();
-        oneByteBuffer.clear();
-
-        final long maxTime = System.currentTimeMillis() + timeoutMillis;
-
-        waitForReady();
-
-        int bytesRead;
-        do {
-            bytesRead = channel.read(oneByteBuffer);
-            if (bytesRead == 0) {
-                if (System.currentTimeMillis() > maxTime) {
-                    throw new SocketTimeoutException("Timed out reading from socket");
-                }
-            }
-        } while (bytesRead == 0);
-
-        if (bytesRead == -1) {
-            return -1;
-        }
-
-        oneByteBuffer.flip();
-        return oneByteBuffer.get() & 0xFF;
-    }
-
-    @Override
-    public int read(final byte[] b) throws IOException {
-        return read(b, 0, b.length);
-    }
-
-    @Override
-    public int read(final byte[] b, final int off, final int len) throws IOException {
-        if (bufferedByte != null) {
-            final byte retVal = bufferedByte;
-            bufferedByte = null;
-            b[off] = retVal;
-            return 1;
-        }
-
-        waitForReady();
-
-        final ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
-        final long maxTime = System.currentTimeMillis() + timeoutMillis;
-        int bytesRead;
-        do {
-            bytesRead = channel.read(buffer);
-            if (bytesRead == 0) {
-                if (System.currentTimeMillis() > maxTime) {
-                    throw new SocketTimeoutException("Timed out reading from socket");
-                }
-            }
-        } while (bytesRead == 0);
-
-        return bytesRead;
-    }
-
-    private void waitForReady() throws IOException {
-        int readyCount = readSelector.select(timeoutMillis);
-        if (readyCount < 1) {
-            if (interrupted) {
-                throw new TransmissionDisabledException();
-            }
-
-            throw new SocketTimeoutException("Timed out reading from socket");
-        }
-
-        final Set<SelectionKey> selectedKeys = readSelector.selectedKeys();
-        selectedKeys.clear(); // clear the selected keys so that the Selector will be able to add them back to the ready set next time they are ready.
-    }
-
-    @Override
-    public int available() throws IOException {
-        if (bufferedByte != null) {
-            return 1;
-        }
-
-        isDataAvailable(); // attempt to read from socket
-        return (bufferedByte == null) ? 0 : 1;
-    }
-
-    public boolean isDataAvailable() throws IOException {
-        if (bufferedByte != null) {
-            return true;
-        }
-
-        oneByteBuffer.flip();
-        oneByteBuffer.clear();
-        final int bytesRead = channel.read(oneByteBuffer);
-        if (bytesRead == -1) {
-            throw new EOFException("Peer has closed the stream");
-        }
-        if (bytesRead > 0) {
-            oneByteBuffer.flip();
-            bufferedByte = oneByteBuffer.get();
-            return true;
-        }
-        return false;
-    }
-
-    public void interrupt() {
-        interrupted = true;
-        readSelector.wakeup();
-    }
-
-    /**
-     * Closes the underlying socket channel.
-     *
-     * @throws java.io.IOException for issues closing underlying stream
-     */
-    @Override
-    public void close() throws IOException {
-        channel.close();
-        readSelector.close();
-    }
-}
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java
deleted file mode 100644
index 6a387a9e45..0000000000
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.SocketChannel;
-import java.util.concurrent.TimeUnit;
-
-public class SocketChannelOutputStream extends OutputStream {
-
-    private static final long CHANNEL_FULL_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(1, TimeUnit.MILLISECONDS);
-    private final SocketChannel channel;
-    private volatile int timeout = 30000;
-
-    private final ByteBuffer oneByteBuffer = ByteBuffer.allocate(1);
-
-    public SocketChannelOutputStream(final SocketChannel socketChannel) throws IOException {
-        // this class expects a non-blocking channel
-        socketChannel.configureBlocking(false);
-        this.channel = socketChannel;
-    }
-
-    public void setTimeout(final int timeoutMillis) {
-        this.timeout = timeoutMillis;
-    }
-
-    @Override
-    public void write(final int b) throws IOException {
-        oneByteBuffer.flip();
-        oneByteBuffer.clear();
-        oneByteBuffer.put((byte) b);
-        oneByteBuffer.flip();
-
-        final int timeoutMillis = this.timeout;
-        long maxTime = System.currentTimeMillis() + timeoutMillis;
-        int bytesWritten;
-
-        long sleepNanos = 1L;
-        while (oneByteBuffer.hasRemaining()) {
-            bytesWritten = channel.write(oneByteBuffer);
-            if (bytesWritten == 0) {
-                if (System.currentTimeMillis() > maxTime) {
-                    throw new SocketTimeoutException("Timed out writing to socket");
-                }
-
-                try {
-                    TimeUnit.NANOSECONDS.sleep(sleepNanos);
-                } catch (InterruptedException e) {
-                    close();
-                    Thread.currentThread().interrupt(); // set the interrupt status
-                    throw new ClosedByInterruptException(); // simulate an interrupted blocked write operation
-                }
-
-                sleepNanos = Math.min(sleepNanos * 2, CHANNEL_FULL_WAIT_NANOS);
-            } else {
-                return;
-            }
-        }
-    }
-
-    @Override
-    public void write(final byte[] b) throws IOException {
-        write(b, 0, b.length);
-    }
-
-    @Override
-    public void write(final byte[] b, final int off, final int len) throws IOException {
-        final ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
-
-        final int timeoutMillis = this.timeout;
-        long maxTime = System.currentTimeMillis() + timeoutMillis;
-        int bytesWritten;
-
-        long sleepNanos = 1L;
-        while (buffer.hasRemaining()) {
-            bytesWritten = channel.write(buffer);
-            if (bytesWritten == 0) {
-                if (System.currentTimeMillis() > maxTime) {
-                    throw new SocketTimeoutException("Timed out writing to socket");
-                }
-
-                try {
-                    TimeUnit.NANOSECONDS.sleep(sleepNanos);
-                } catch (InterruptedException e) {
-                    close();
-                    Thread.currentThread().interrupt(); // set the interrupt status
-                    throw new ClosedByInterruptException(); // simulate an interrupted blocked write operation
-                }
-
-                sleepNanos = Math.min(sleepNanos * 2, CHANNEL_FULL_WAIT_NANOS);
-            } else {
-                maxTime = System.currentTimeMillis() + timeoutMillis;
-            }
-        }
-    }
-
-    /**
-     * Closes the underlying SocketChannel
-     *
-     * @throws java.io.IOException if issues closing underlying stream
-     */
-    @Override
-    public void close() throws IOException {
-        channel.close();
-    }
-}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/io/socket/TestSocketChannelStreams.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/io/socket/TestSocketChannelStreams.java
deleted file mode 100644
index 27a8807a3b..0000000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/io/socket/TestSocketChannelStreams.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket;
-
-//package nifi.remote.io.socket;
-//
-//import static org.junit.Assert.assertEquals;
-//import static org.junit.Assert.assertTrue;
-//
-//import java.io.ByteArrayOutputStream;
-//import java.io.DataInputStream;
-//import java.io.DataOutputStream;
-//import java.io.IOException;
-//import java.io.InputStream;
-//import java.io.OutputStream;
-//import java.net.InetSocketAddress;
-//import java.net.ServerSocket;
-//import java.net.Socket;
-//import java.net.SocketTimeoutException;
-//import java.net.URI;
-//import java.net.URISyntaxException;
-//import java.nio.channels.SocketChannel;
-//import java.util.Arrays;
-//import java.util.concurrent.TimeUnit;
-//
-//import javax.net.ServerSocketFactory;
-//
-//import nifi.events.EventReporter;
-//import nifi.groups.RemoteProcessGroup;
-//import nifi.remote.RemoteGroupPort;
-//import nifi.remote.RemoteResourceFactory;
-//import nifi.remote.StandardSiteToSiteProtocol;
-//import nifi.remote.TransferDirection;
-//import nifi.remote.exception.HandshakeException;
-//import nifi.remote.exception.PortNotRunningException;
-//import nifi.remote.exception.UnknownPortException;
-//import nifi.remote.protocol.CommunicationsProtocol;
-//import nifi.remote.protocol.CommunicationsSession;
-//import nifi.util.NiFiProperties;
-//
-//import org.junit.Ignore;
-//import org.junit.Test;
-//import org.mockito.Mockito;
-//
-//@Ignore("For local testing only")
-//public class TestSocketChannelStreams {
-//    public static final int DATA_SIZE = 8 * 1024 * 1024;
-//
-//    @Test
-//    public void testSendingToLocalInstanceWithoutSSL() throws IOException, InterruptedException, HandshakeException, UnknownPortException, PortNotRunningException, URISyntaxException {
-//        System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties");
-//        final SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", 5000));
-//        channel.configureBlocking(false);
-//
-//        final CommunicationsSession commsSession;
-//        commsSession = new SocketChannelCommunicationsSession(channel, "", null);
-//        commsSession.setUri("nifi://localhost:5000");
-//        final DataInputStream dis = new DataInputStream(commsSession.getRequest().getInputStream());
-//        final DataOutputStream dos = new DataOutputStream(commsSession.getResponse().getOutputStream());
-//
-//        dos.write(CommunicationsProtocol.MAGIC_BYTES);
-//        dos.flush();
-//
-//        final EventReporter eventReporter = Mockito.mock(EventReporter.class);
-//        final StandardSiteToSiteProtocol proposedProtocol = new StandardSiteToSiteProtocol(commsSession, eventReporter, nifiProperties);
-//
-//        final StandardSiteToSiteProtocol negotiatedProtocol = (StandardSiteToSiteProtocol) RemoteResourceFactory.initiateResourceNegotiation(proposedProtocol, dis, dos);
-//        System.out.println(negotiatedProtocol);
-//
-//        final RemoteProcessGroup rpg = Mockito.mock(RemoteProcessGroup.class);
-//        Mockito.when(rpg.getCommunicationsTimeout(Mockito.any(TimeUnit.class))).thenReturn(2000);
-//        Mockito.when(rpg.getTargetUri()).thenReturn( new URI("https://localhost:5050/") );
-//
-//        final RemoteGroupPort port = Mockito.mock(RemoteGroupPort.class);
-//        Mockito.when(port.getIdentifier()).thenReturn("90880680-d6da-40be-b2cc-a15423de2e1a");
-//        Mockito.when(port.getName()).thenReturn("Data In");
-//        Mockito.when(port.getRemoteProcessGroup()).thenReturn(rpg);
-//
-//        negotiatedProtocol.initiateHandshake(port, TransferDirection.SEND);
-//    }
-//
-//    @Test
-//    public void testInputOutputStreams() throws IOException, InterruptedException {
-//        final ServerThread server = new ServerThread();
-//        server.start();
-//
-//        int port = server.getPort();
-//        while ( port <= 0 ) {
-//            Thread.sleep(10L);
-//            port = server.getPort();
-//        }
-//
-//        final SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", port));
-//        channel.configureBlocking(false);
-//
-//        final OutputStream out = new SocketChannelOutputStream(channel);
-//        final InputStream in = new SocketChannelInputStream(channel);
-//        final DataInputStream dataIn = new DataInputStream(in);
-//
-//        final byte[] sent = new byte[DATA_SIZE];
-//        for (int i=0; i < sent.length; i++) {
-//            sent[i] = (byte) (i % 255);
-//        }
-//
-//        for (int itr=0; itr < 5; itr++) {
-//            final long start = System.nanoTime();
-//            out.write(sent);
-//            final long nanos = System.nanoTime() - start;
-//            final long millis = TimeUnit.MILLISECONDS.convert(nanos, TimeUnit.NANOSECONDS);
-//            final float seconds = (float) millis / 1000F;
-//            final float megabytes = (float) DATA_SIZE / (1024F * 1024F);
-//            final float MBperS = megabytes / seconds;
-//            System.out.println("Millis: " + millis + "; MB/s: " + MBperS);
-//
-//            Thread.sleep(2500L);
-//            final byte[] received = server.getReceivedData();
-//            System.out.println("Server received " + received.length + " bytes");
-//            server.clearReceivedData();
-//            assertTrue(Arrays.equals(sent, received));
-//
-//            final long val = dataIn.readLong();
-//            assertEquals(DATA_SIZE, val);
-//            System.out.println(val);
-//        }
-//
-//        server.shutdown();
-//    }
-//
-//    public final long toLong(final byte[] buffer) throws IOException {
-//        return (((long)buffer[0] << 56) +
-//                ((long)(buffer[1] & 255) << 48) +
-//                ((long)(buffer[2] & 255) << 40) +
-//                ((long)(buffer[3] & 255) << 32) +
-//                ((long)(buffer[4] & 255) << 24) +
-//                ((buffer[5] & 255) << 16) +
-//                ((buffer[6] & 255) <<  8) +
-//                ((buffer[7] & 255) <<  0));
-//    }
-//
-//    private static class ServerThread extends Thread {
-//        private int listeningPort;
-//        private final ByteArrayOutputStream received = new ByteArrayOutputStream();
-//
-//        private volatile int readingDelay = 0;
-//        private volatile boolean shutdown = false;
-//
-//        public ServerThread() {
-//        }
-//
-//        public int getPort() {
-//            return listeningPort;
-//        }
-//
-//        public byte[] getReceivedData() {
-//            return received.toByteArray();
-//        }
-//
-//        @Override
-//        public void run() {
-//            try {
-//                final ServerSocketFactory serverSocketFactory = ServerSocketFactory.getDefault();
-//                final ServerSocket serverSocket = serverSocketFactory.createServerSocket(0);
-//                this.listeningPort = serverSocket.getLocalPort();
-//
-//                final Socket socket = serverSocket.accept();
-//                final InputStream stream = socket.getInputStream();
-//                final DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
-//
-//                final byte[] buffer = new byte[4096];
-//                int len;
-//
-//                while (!shutdown) {
-//                    try {
-//                        len = stream.read(buffer);
-//
-//                        System.out.println("Received " + len + " bytes");
-//
-//                        if ( readingDelay > 0 ) {
-//                            try { Thread.sleep(readingDelay); } catch (final InterruptedException e) {}
-//                        }
-//                    } catch (final SocketTimeoutException e) {
-//                        continue;
-//                    }
-//
-//                    if ( len < 0 ) {
-//                        return;
-//                    }
-//
-//                    received.write(buffer, 0, len);
-//
-//                    final long length = received.size();
-//                    if ( length % (DATA_SIZE) == 0 ) {
-//                        dos.writeLong(length);
-//                        dos.flush();
-//                    }
-//                }
-//
-//                System.out.println("Server successfully shutdown");
-//            } catch (final Exception e) {
-//                e.printStackTrace();
-//            }
-//        }
-//
-//        public void clearReceivedData() {
-//            this.received.reset();
-//        }
-//
-//        public void shutdown() {
-//            this.shutdown = true;
-//        }
-//
-//        public void delayReading(final int millis) {
-//            this.readingDelay = millis;
-//        }
-//    }
-//
-//}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/CacheOperation.java
similarity index 69%
copy from nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
copy to nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/CacheOperation.java
index ce6718f6f5..4454984f31 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/CacheOperation.java
@@ -17,21 +17,13 @@
 package org.apache.nifi.distributed.cache.operations;
 
 /**
- * Represents a distributed set cache operation which may be invoked.
+ * Cache Operation definition
  */
-public enum SetOperation {
-    ADD_IF_ABSENT("addIfAbsent"),
-    CONTAINS("contains"),
-    REMOVE("remove"),
-    CLOSE("close");
-
-    private final String operation;
-
-    SetOperation(final String operation) {
-        this.operation = operation;
-    }
-
-    public String value() {
-        return operation;
-    }
+public interface CacheOperation {
+    /**
+     * Get Cache Operation value used during protocol communication
+     *
+     * @return Cache Operation value
+     */
+    String value();
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/MapOperation.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/MapOperation.java
index 12177c09a7..e44edee5ed 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/MapOperation.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/MapOperation.java
@@ -19,7 +19,7 @@ package org.apache.nifi.distributed.cache.operations;
 /**
  * Represents a distributed set cache operation which may be invoked.
  */
-public enum MapOperation {
+public enum MapOperation implements CacheOperation {
     CONTAINS_KEY("containsKey"),
     FETCH("fetch"),
     GET("get"),
@@ -41,6 +41,7 @@ public enum MapOperation {
         this.operation = operation;
     }
 
+    @Override
     public String value() {
         return operation;
     }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
index ce6718f6f5..c07af160ff 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
@@ -19,7 +19,7 @@ package org.apache.nifi.distributed.cache.operations;
 /**
  * Represents a distributed set cache operation which may be invoked.
  */
-public enum SetOperation {
+public enum SetOperation implements CacheOperation {
     ADD_IF_ABSENT("addIfAbsent"),
     CONTAINS("contains"),
     REMOVE("remove"),
@@ -31,6 +31,7 @@ public enum SetOperation {
         this.operation = operation;
     }
 
+    @Override
     public String value() {
         return operation;
     }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/StandardCacheOperation.java
similarity index 82%
copy from nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
copy to nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/StandardCacheOperation.java
index ce6718f6f5..23eef3271a 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/StandardCacheOperation.java
@@ -17,20 +17,18 @@
 package org.apache.nifi.distributed.cache.operations;
 
 /**
- * Represents a distributed set cache operation which may be invoked.
+ * Standard Cache Operation
  */
-public enum SetOperation {
-    ADD_IF_ABSENT("addIfAbsent"),
-    CONTAINS("contains"),
-    REMOVE("remove"),
+public enum StandardCacheOperation implements CacheOperation {
     CLOSE("close");
 
     private final String operation;
 
-    SetOperation(final String operation) {
+    StandardCacheOperation(final String operation) {
         this.operation = operation;
     }
 
+    @Override
     public String value() {
         return operation;
     }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/pom.xml
index 5e42ee395b..a7ad1d5bc1 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/pom.xml
@@ -30,6 +30,11 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-distributed-cache-protocol</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-event-transport</artifactId>
+            <version>1.17.0-SNAPSHOT</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-utils</artifactId>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
deleted file mode 100644
index fb3d597ed3..0000000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-import javax.net.ssl.SSLContext;
-
-import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
-import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
-import org.apache.nifi.remote.StandardVersionNegotiator;
-import org.apache.nifi.remote.VersionNegotiator;
-import org.apache.nifi.remote.io.socket.SocketChannelInputStream;
-import org.apache.nifi.remote.io.socket.SocketChannelOutputStream;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class AbstractCacheServer implements CacheServer {
-
-    private static final Logger logger = LoggerFactory.getLogger(AbstractCacheServer.class);
-
-    private final String identifier;
-    private final int port;
-    private final int maxReadSize;
-    private final SSLContext sslContext;
-    protected volatile boolean stopped = false;
-    private final Set<Thread> processInputThreads = new CopyOnWriteArraySet<>();
-
-    private volatile ServerSocketChannel serverSocketChannel;
-
-    public AbstractCacheServer(final String identifier, final SSLContext sslContext, final int port, final int maxReadSize) {
-        this.identifier = identifier;
-        this.port = port;
-        this.sslContext = sslContext;
-        this.maxReadSize = maxReadSize;
-    }
-
-    @Override
-    public int getPort() {
-        return serverSocketChannel == null ? this.port : serverSocketChannel.socket().getLocalPort();
-    }
-
-    @Override
-    public void start() throws IOException {
-        serverSocketChannel = ServerSocketChannel.open();
-        serverSocketChannel.configureBlocking(true);
-        serverSocketChannel.bind(new InetSocketAddress(port));
-
-        final Runnable runnable = new Runnable() {
-
-            @Override
-            public void run() {
-                while (true) {
-                    final SocketChannel socketChannel;
-                    try {
-                        socketChannel = serverSocketChannel.accept();
-                        logger.debug("Connected to {}", new Object[]{socketChannel});
-                    } catch (final IOException e) {
-                        if (!stopped) {
-                            logger.error("{} unable to accept connection from remote peer due to {}", this, e.toString());
-                            if (logger.isDebugEnabled()) {
-                                logger.error("", e);
-                            }
-                        }
-                        return;
-                    }
-
-                    final Runnable processInputRunnable = new Runnable() {
-                        @Override
-                        public void run() {
-                            final InputStream rawInputStream;
-                            final OutputStream rawOutputStream;
-                            final String peer = socketChannel.socket().getInetAddress().getHostName();
-
-                            try {
-                                if (sslContext == null) {
-                                    rawInputStream = new SocketChannelInputStream(socketChannel);
-                                    rawOutputStream = new SocketChannelOutputStream(socketChannel);
-                                } else {
-                                    final SSLSocketChannel sslSocketChannel = new SSLSocketChannel(sslContext, socketChannel, false);
-                                    sslSocketChannel.connect();
-                                    rawInputStream = new SSLSocketChannelInputStream(sslSocketChannel);
-                                    rawOutputStream = new SSLSocketChannelOutputStream(sslSocketChannel);
-                                }
-                            } catch (final IOException e) {
-                                logger.error("Cannot create input and/or output streams for {}", new Object[]{identifier}, e);
-                                if (logger.isDebugEnabled()) {
-                                    logger.error("", e);
-                                }
-                                try {
-                                    socketChannel.close();
-                                } catch (final IOException swallow) {
-                                }
-
-                                return;
-                            }
-                            try (final InputStream in = new BufferedInputStream(rawInputStream);
-                                 final OutputStream out = new BufferedOutputStream(rawOutputStream)) {
-
-                                final VersionNegotiator versionNegotiator = getVersionNegotiator();
-
-                                ProtocolHandshake.receiveHandshake(in, out, versionNegotiator);
-
-                                boolean continueComms = true;
-                                while (continueComms) {
-                                    continueComms = listen(in, out, versionNegotiator.getVersion());
-                                }
-                                // client has issued 'close'
-                                logger.debug("Client issued close on {}", new Object[]{socketChannel});
-                            } catch (final SocketTimeoutException e) {
-                                logger.debug("30 sec timeout reached", e);
-                            } catch (final IOException | HandshakeException e) {
-                                if (!stopped) {
-                                    logger.error("{} unable to communicate with remote peer {} due to {}", new Object[]{this, peer, e.toString()});
-                                    if (logger.isDebugEnabled()) {
-                                        logger.error("", e);
-                                    }
-                                }
-                            } finally {
-                                processInputThreads.remove(Thread.currentThread());
-                            }
-                        }
-                    };
-
-                    final Thread processInputThread = new Thread(processInputRunnable);
-                    processInputThread.setName("Distributed Cache Server Communications Thread: " + identifier);
-                    processInputThread.setDaemon(true);
-                    processInputThread.start();
-                    processInputThreads.add(processInputThread);
-                }
-            }
-        };
-
-        final Thread thread = new Thread(runnable);
-        thread.setDaemon(true);
-        thread.setName("Distributed Cache Server: " + identifier);
-        thread.start();
-    }
-
-    /**
-     * Refer {@link org.apache.nifi.distributed.cache.protocol.ProtocolHandshake#initiateHandshake(InputStream, OutputStream, VersionNegotiator)}
-     * for details of each version enhancements.
-     */
-    protected StandardVersionNegotiator getVersionNegotiator() {
-        return new StandardVersionNegotiator(1);
-    }
-
-    @Override
-    public void stop() throws IOException {
-        stopped = true;
-        logger.info("Stopping CacheServer {}", new Object[]{this.identifier});
-
-        if (serverSocketChannel != null && serverSocketChannel.isOpen()) {
-            try {
-                serverSocketChannel.close();
-            } catch (final IOException e) {
-                logger.warn("Server Socket Close Failed", e);
-            }
-        }
-        // need to close out the created SocketChannels...this is done by interrupting
-        // the created threads that loop on listen().
-        for (final Thread processInputThread : processInputThreads) {
-            processInputThread.interrupt();
-            int i = 0;
-            while (!processInputThread.isInterrupted() && i++ < 5) {
-                try {
-                    Thread.sleep(50); // allow thread to gracefully terminate
-                } catch (final InterruptedException e) {
-                }
-            }
-        }
-        processInputThreads.clear();
-    }
-
-    @Override
-    public String toString() {
-        return "CacheServer[id=" + identifier + "]";
-    }
-
-    /**
-     * Listens for incoming data and communicates with remote peer
-     *
-     * @param in in
-     * @param out out
-     * @param version version
-     * @return <code>true</code> if communications should continue, <code>false</code> otherwise
-     * @throws IOException ex
-     */
-    protected abstract boolean listen(InputStream in, OutputStream out, int version) throws IOException;
-
-    /**
-     * Read a length-prefixed value from the {@link DataInputStream}.
-     *
-     * @param dis the {@link DataInputStream} from which to read the value
-     * @return the serialized representation of the value
-     * @throws IOException on failure to read from the input stream
-     */
-    protected byte[] readValue(final DataInputStream dis) throws IOException {
-        final int numBytes = validateSize(dis.readInt());
-        final byte[] buffer = new byte[numBytes];
-        dis.readFully(buffer);
-        return buffer;
-    }
-
-    /**
-     * Validate a size value received from the {@link DataInputStream} against the configured maximum.
-     *
-     * @param size the size value received from the {@link DataInputStream}
-     * @return the size value, iff it passes validation; otherwise, an exception is thrown
-     */
-    protected int validateSize(final int size) {
-        if (size <= maxReadSize) {
-            return size;
-        } else {
-            throw new IllegalStateException(String.format("Size [%d] exceeds maximum configured read [%d]", size, maxReadSize));
-        }
-    }
-}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java
index cb81725f28..201ef93455 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java
@@ -21,6 +21,7 @@ import javax.net.ssl.SSLContext;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.distributed.cache.server.set.StandardSetCacheServer;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.ssl.SSLContextService;
 
@@ -38,12 +39,7 @@ public class DistributedSetCacheServer extends DistributedCacheServer {
         final String evictionPolicyName = context.getProperty(EVICTION_POLICY).getValue();
         final int maxReadSize = context.getProperty(MAX_READ_SIZE).asDataSize(DataUnit.B).intValue();
 
-        final SSLContext sslContext;
-        if (sslContextService == null) {
-            sslContext = null;
-        } else {
-            sslContext = sslContextService.createContext();
-        }
+        final SSLContext sslContext = sslContextService == null ? null : sslContextService.createContext();
 
         final EvictionPolicy evictionPolicy;
         switch (evictionPolicyName) {
@@ -63,7 +59,7 @@ public class DistributedSetCacheServer extends DistributedCacheServer {
         try {
             final File persistenceDir = persistencePath == null ? null : new File(persistencePath);
 
-            return new SetCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir, maxReadSize);
+            return new StandardSetCacheServer(getLogger(), getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir, maxReadSize);
         } catch (final Exception e) {
             throw new RuntimeException(e);
         }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EventCacheServer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EventCacheServer.java
new file mode 100644
index 0000000000..059bbb59ef
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EventCacheServer.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server;
+
+import org.apache.nifi.event.transport.EventServer;
+import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
+import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.security.util.ClientAuth;
+
+import javax.net.ssl.SSLContext;
+import java.net.InetAddress;
+import java.util.Objects;
+
+/**
+ * Abstract Event Cache Server with standard lifecycle methods
+ */
+public abstract class EventCacheServer implements CacheServer {
+    private static final InetAddress ALL_ADDRESSES = null;
+
+    private final ComponentLog log;
+
+    private final int port;
+
+    private EventServer eventServer;
+
+    public EventCacheServer(
+            final ComponentLog log,
+            final int port
+    ) {
+        this.log = Objects.requireNonNull(log, "Component Log required");
+        this.port = port;
+    }
+
+    /**
+     * Start Server
+     *
+     */
+    @Override
+    public void start() {
+        eventServer = createEventServer();
+        log.info("Started Cache Server Port [{}]", port);
+    }
+
+    /**
+     * Stop Server
+     *
+     */
+    @Override
+    public void stop() {
+        if (eventServer == null) {
+            log.info("Server not running");
+        } else {
+            eventServer.shutdown();
+        }
+
+        log.info("Stopped Cache Server Port [{}]", port);
+    }
+
+    /**
+     * Get Server Port Number
+     *
+     * @return Port Number
+     */
+    @Override
+    public int getPort() {
+        return port;
+    }
+
+    /**
+     * Create Event Server Factory with standard properties
+     *
+     * @param identifier Component Identifier
+     * @param sslContext SSL Context is null when not configured
+     * @return Netty Event Server Factory
+     */
+    protected NettyEventServerFactory createEventServerFactory(final String identifier, final SSLContext sslContext) {
+        final NettyEventServerFactory eventServerFactory = new NettyEventServerFactory(ALL_ADDRESSES, port, TransportProtocol.TCP);
+        eventServerFactory.setSslContext(sslContext);
+        eventServerFactory.setClientAuth(ClientAuth.REQUIRED);
+
+        final String threadNamePrefix = String.format("%s[%s]", getClass().getSimpleName(), identifier);
+        eventServerFactory.setThreadNamePrefix(threadNamePrefix);
+
+        eventServerFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
+        eventServerFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
+
+        return eventServerFactory;
+    }
+
+    /**
+     * Create Event Server
+     *
+     * @return Event Server
+     */
+    protected abstract EventServer createEventServer();
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
deleted file mode 100644
index 5e1fb03eec..0000000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-import javax.net.ssl.SSLContext;
-
-import org.apache.nifi.distributed.cache.server.set.PersistentSetCache;
-import org.apache.nifi.distributed.cache.server.set.SetCache;
-import org.apache.nifi.distributed.cache.server.set.SetCacheResult;
-import org.apache.nifi.distributed.cache.server.set.SimpleSetCache;
-
-public class SetCacheServer extends AbstractCacheServer {
-
-    private final SetCache cache;
-
-    public SetCacheServer(final String identifier, final SSLContext sslContext, final int port, final int maxSize,
-                          final EvictionPolicy evictionPolicy, final File persistencePath, final int maxReadSize) throws IOException {
-        super(identifier, sslContext, port, maxReadSize);
-
-        final SetCache simpleCache = new SimpleSetCache(identifier, maxSize, evictionPolicy);
-
-        if (persistencePath == null) {
-            this.cache = simpleCache;
-        } else {
-            final PersistentSetCache persistentCache = new PersistentSetCache(identifier, persistencePath, simpleCache);
-            persistentCache.restore();
-            this.cache = persistentCache;
-        }
-    }
-
-    @Override
-    protected boolean listen(final InputStream in, final OutputStream out, final int version) throws IOException {
-        final DataInputStream dis = new DataInputStream(in);
-        final DataOutputStream dos = new DataOutputStream(out);
-
-        final String action = dis.readUTF();
-        if (action.equals("close")) {
-            return false;
-        }
-
-        final byte[] value = readValue(dis);
-        final ByteBuffer valueBuffer = ByteBuffer.wrap(value);
-
-        final SetCacheResult response;
-        switch (action) {
-            case "addIfAbsent":
-                response = cache.addIfAbsent(valueBuffer);
-                break;
-            case "contains":
-                response = cache.contains(valueBuffer);
-                break;
-            case "remove":
-                response = cache.remove(valueBuffer);
-                break;
-            default:
-                throw new IOException("IllegalRequest");
-        }
-
-        dos.writeBoolean(response.getResult());
-        dos.flush();
-
-        return true;
-    }
-
-    @Override
-    public void stop() throws IOException {
-        try {
-            super.stop();
-        } finally {
-            cache.shutdown();
-        }
-    }
-
-    @Override
-    protected void finalize() throws Throwable {
-        if (!stopped) {
-            stop();
-        }
-    }
-}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/CacheOperationResultEncoder.java
similarity index 52%
copy from nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
copy to nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/CacheOperationResultEncoder.java
index ce6718f6f5..78b86235c4 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/CacheOperationResultEncoder.java
@@ -14,24 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.distributed.cache.operations;
+package org.apache.nifi.distributed.cache.server.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.nifi.distributed.cache.server.protocol.CacheOperationResult;
 
 /**
- * Represents a distributed set cache operation which may be invoked.
+ * Message Encoder for Cache Operation Results
  */
-public enum SetOperation {
-    ADD_IF_ABSENT("addIfAbsent"),
-    CONTAINS("contains"),
-    REMOVE("remove"),
-    CLOSE("close");
-
-    private final String operation;
-
-    SetOperation(final String operation) {
-        this.operation = operation;
-    }
-
-    public String value() {
-        return operation;
+@ChannelHandler.Sharable
+public class CacheOperationResultEncoder extends MessageToByteEncoder<CacheOperationResult> {
+    @Override
+    protected void encode(final ChannelHandlerContext channelHandlerContext, final CacheOperationResult cacheOperationResult, final ByteBuf byteBuf) {
+        final int code = cacheOperationResult.isSuccess() ? 1 : 0;
+        byteBuf.writeByte(code);
     }
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/CacheRequestDecoder.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/CacheRequestDecoder.java
new file mode 100644
index 0000000000..51b4bfa608
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/CacheRequestDecoder.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.distributed.cache.operations.CacheOperation;
+import org.apache.nifi.distributed.cache.operations.StandardCacheOperation;
+import org.apache.nifi.distributed.cache.server.protocol.CacheRequest;
+import org.apache.nifi.distributed.cache.server.protocol.CacheVersionRequest;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.net.SocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Cache Request Decoder processes bytes and decodes cache version and operation requests
+ */
+public class CacheRequestDecoder extends ByteToMessageDecoder {
+    private static final int HEADER_LENGTH = 4;
+
+    private static final int LONG_LENGTH = 8;
+
+    private static final int INT_LENGTH = 4;
+
+    private static final int SHORT_LENGTH = 2;
+
+    private final AtomicBoolean headerReceived = new AtomicBoolean();
+
+    private final AtomicInteger protocolVersion = new AtomicInteger();
+
+    private final ComponentLog log;
+
+    private final int maxLength;
+
+    private final CacheOperation[] supportedOperations;
+
+    public CacheRequestDecoder(
+            final ComponentLog log,
+            final int maxLength,
+            final CacheOperation[] supportedOperations
+    ) {
+        this.log = log;
+        this.maxLength = maxLength;
+        this.supportedOperations = supportedOperations;
+    }
+
+    /**
+     * Decode Byte Buffer reading header on initial connection followed by protocol version and cache operations
+     *
+     * @param channelHandlerContext Channel Handler Context
+     * @param byteBuf Byte Buffer
+     * @param objects Decoded Objects
+     */
+    @Override
+    protected void decode(final ChannelHandlerContext channelHandlerContext, final ByteBuf byteBuf, final List<Object> objects) {
+        if (!headerReceived.get()) {
+            readHeader(byteBuf, channelHandlerContext.channel().remoteAddress());
+        }
+
+        if (protocolVersion.get() == 0) {
+            final OptionalInt clientVersion = readInt(byteBuf);
+            if (clientVersion.isPresent()) {
+                final int clientVersionFound = clientVersion.getAsInt();
+                log.debug("Protocol Version [{}] Received [{}]", clientVersionFound, channelHandlerContext.channel().remoteAddress());
+                final CacheVersionRequest cacheVersionRequest = new CacheVersionRequest(clientVersionFound);
+                objects.add(cacheVersionRequest);
+            }
+        } else {
+            // Mark ByteBuf reader index to reset when sufficient bytes are not found
+            byteBuf.markReaderIndex();
+
+            final Optional<CacheOperation> cacheOperation = readOperation(byteBuf);
+            if (cacheOperation.isPresent()) {
+                final CacheOperation cacheOperationFound = cacheOperation.get();
+
+                final Optional<Object> cacheRequest = readRequest(cacheOperationFound, byteBuf);
+                if (cacheRequest.isPresent()) {
+                    final Object cacheRequestFound = cacheRequest.get();
+                    objects.add(cacheRequestFound);
+                } else if (StandardCacheOperation.CLOSE.value().contentEquals(cacheOperationFound.value())) {
+                    objects.add(new CacheRequest(cacheOperationFound, null));
+                } else {
+                    byteBuf.resetReaderIndex();
+                    log.debug("Cache Operation [{}] request not processed", cacheOperationFound);
+                }
+            } else {
+                byteBuf.resetReaderIndex();
+            }
+        }
+    }
+
+    @Override
+    public void exceptionCaught(final ChannelHandlerContext context, final Throwable cause) {
+        log.warn("Request Decoding Failed: Closing Connection [{}]", context.channel().remoteAddress(), cause);
+        context.close();
+    }
+
+    /**
+     * Set Protocol Version based on version negotiated in other handlers
+     *
+     * @param protocolVersion Protocol Version
+     */
+    public void setProtocolVersion(final int protocolVersion) {
+        this.protocolVersion.getAndSet(protocolVersion);
+    }
+
+    /**
+     * Read Request Object based on Cache Operation
+     *
+     * @param cacheOperation Cache Operation
+     * @param byteBuf Byte Buffer
+     * @return Request Object or empty when buffer does not contain sufficient bytes
+     */
+    protected Optional<Object> readRequest(final CacheOperation cacheOperation, final ByteBuf byteBuf) {
+        final Optional<byte[]> bytes = readBytes(byteBuf);
+        return bytes.map(value -> new CacheRequest(cacheOperation, value));
+    }
+
+    /**
+     * Read Bytes from buffer based on length indicated
+     *
+     * @param byteBuf Byte Buffer
+     * @return Bytes read or null when buffer does not contain sufficient bytes
+     */
+    protected Optional<byte[]> readBytes(final ByteBuf byteBuf) {
+        final Optional<byte[]> bytesRead;
+
+        final OptionalInt length = readInt(byteBuf);
+        if (length.isPresent()) {
+            final int readableBytes = byteBuf.readableBytes();
+            final int lengthFound = length.getAsInt();
+            if (readableBytes >= lengthFound) {
+                bytesRead = Optional.of(readBytes(byteBuf, lengthFound));
+            } else {
+                bytesRead = Optional.empty();
+            }
+        } else {
+            bytesRead = Optional.empty();
+        }
+
+        return bytesRead;
+    }
+
+    /**
+     * Read Unicode String from buffer based on length of available bytes
+     *
+     * @param byteBuf Byte Buffer
+     * @return String or null when buffer does not contain sufficient bytes
+     */
+    protected Optional<String> readUnicodeString(final ByteBuf byteBuf) {
+        final String unicodeString;
+
+        if (byteBuf.readableBytes() >= SHORT_LENGTH) {
+            final int length = byteBuf.readUnsignedShort();
+            if (length > maxLength) {
+                throw new IllegalArgumentException(String.format("Maximum Operation Length [%d] exceeded [%d]", maxLength, length));
+            }
+            if (byteBuf.readableBytes() >= length) {
+                unicodeString = byteBuf.readCharSequence(length, StandardCharsets.UTF_8).toString();
+            } else {
+                unicodeString = null;
+            }
+        } else {
+            unicodeString = null;
+        }
+
+        return Optional.ofNullable(unicodeString);
+    }
+
+    /**
+     * Read Integer from buffer
+     *
+     * @param byteBuf Byte Buffer
+     * @return Integer or empty when buffer does not contain sufficient bytes
+     */
+    protected OptionalInt readInt(final ByteBuf byteBuf) {
+        final Integer integer;
+
+        final int readableBytes = byteBuf.readableBytes();
+        if (readableBytes >= INT_LENGTH) {
+            integer = byteBuf.readInt();
+            if (integer > maxLength) {
+                throw new IllegalArgumentException(String.format("Maximum Length [%d] exceeded [%d]", maxLength, integer));
+            }
+        } else {
+            integer = null;
+        }
+
+        return integer == null ? OptionalInt.empty() : OptionalInt.of(integer);
+    }
+
+    protected OptionalLong readLong(final ByteBuf byteBuf) {
+        final int readableBytes = byteBuf.readableBytes();
+        return readableBytes >= LONG_LENGTH ? OptionalLong.of(byteBuf.readLong()) : OptionalLong.empty();
+    }
+
+    private byte[] readBytes(final ByteBuf byteBuf, final int length) {
+        final byte[] bytes = new byte[length];
+        byteBuf.readBytes(bytes);
+        return bytes;
+    }
+
+    private Optional<CacheOperation> readOperation(final ByteBuf byteBuf) {
+        final Optional<String> clientOperation = readUnicodeString(byteBuf);
+
+        return clientOperation.map(operation -> Arrays.stream(supportedOperations)
+                .filter(supportedOperation -> supportedOperation.value().contentEquals(operation))
+                .findFirst()
+                .orElseThrow(() -> new IllegalArgumentException(String.format("Cache Operation not supported [%d]", operation.length())))
+        );
+    }
+
+    private void readHeader(final ByteBuf byteBuf, final SocketAddress remoteAddress) {
+        if (byteBuf.readableBytes() >= HEADER_LENGTH) {
+            byteBuf.readBytes(HEADER_LENGTH);
+            headerReceived.getAndSet(true);
+            log.debug("Header Received [{}]", remoteAddress);
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/CacheVersionRequestHandler.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/CacheVersionRequestHandler.java
new file mode 100644
index 0000000000..4fc228f048
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/CacheVersionRequestHandler.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.codec;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
+import org.apache.nifi.distributed.cache.server.protocol.CacheVersionRequest;
+import org.apache.nifi.distributed.cache.server.protocol.CacheVersionResponse;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.remote.VersionNegotiator;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Handler for Cache Version Requests responsible for negotiating supported version
+ */
+public class CacheVersionRequestHandler extends SimpleChannelInboundHandler<CacheVersionRequest> {
+    private final ComponentLog log;
+
+    private final VersionNegotiator versionNegotiator;
+
+    public CacheVersionRequestHandler(
+            final ComponentLog log,
+            final VersionNegotiator versionNegotiator
+    ) {
+        this.log = Objects.requireNonNull(log, "Component Log required");
+        this.versionNegotiator = Objects.requireNonNull(versionNegotiator, "Version Negotiator required");
+    }
+
+    @Override
+    protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final CacheVersionRequest cacheVersionRequest) {
+        for (final Map.Entry<String, ChannelHandler> entry : channelHandlerContext.channel().pipeline()) {
+            final ChannelHandler channelHandler = entry.getValue();
+            if (channelHandler instanceof CacheRequestDecoder) {
+                final CacheRequestDecoder cacheRequestDecoder = (CacheRequestDecoder) channelHandler;
+
+                final int requestedVersion = cacheVersionRequest.getVersion();
+                final CacheVersionResponse cacheVersionResponse = handleVersion(cacheRequestDecoder, requestedVersion);
+                channelHandlerContext.writeAndFlush(cacheVersionResponse);
+            }
+        }
+    }
+
+    private CacheVersionResponse handleVersion(final CacheRequestDecoder cacheRequestDecoder, final int requestedVersion) {
+        final CacheVersionResponse cacheVersionResponse;
+
+        if (versionNegotiator.isVersionSupported(requestedVersion)) {
+            log.debug("Cache Version Supported [{}]", requestedVersion);
+            cacheRequestDecoder.setProtocolVersion(requestedVersion);
+            cacheVersionResponse = new CacheVersionResponse(ProtocolHandshake.RESOURCE_OK, requestedVersion);
+        } else {
+            final Integer preferredVersion = versionNegotiator.getPreferredVersion(requestedVersion);
+            if (preferredVersion == null) {
+                log.debug("Cache Version Rejected [{}]", requestedVersion);
+                cacheVersionResponse = new CacheVersionResponse(ProtocolHandshake.ABORT, requestedVersion);
+            } else {
+                log.debug("Cache Version Rejected [{}] Preferred [{}]", requestedVersion, preferredVersion);
+                cacheVersionResponse = new CacheVersionResponse(ProtocolHandshake.DIFFERENT_RESOURCE_VERSION, preferredVersion);
+            }
+        }
+
+        return cacheVersionResponse;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/CacheVersionResponseEncoder.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/CacheVersionResponseEncoder.java
new file mode 100644
index 0000000000..76b0cda7a2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/CacheVersionResponseEncoder.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
+import org.apache.nifi.distributed.cache.server.protocol.CacheVersionResponse;
+
+/**
+ * Message Encoder for Cache Version Responses
+ */
+@ChannelHandler.Sharable
+public class CacheVersionResponseEncoder extends MessageToByteEncoder<CacheVersionResponse> {
+    @Override
+    protected void encode(final ChannelHandlerContext channelHandlerContext, final CacheVersionResponse cacheVersionResponse, final ByteBuf byteBuf) {
+        final int statusCode = cacheVersionResponse.getStatusCode();
+        byteBuf.writeByte(statusCode);
+
+        if (ProtocolHandshake.DIFFERENT_RESOURCE_VERSION == statusCode) {
+            final int version = cacheVersionResponse.getVersion();
+            byteBuf.writeInt(version);
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapCacheRequestDecoder.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapCacheRequestDecoder.java
new file mode 100644
index 0000000000..f482d6912b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapCacheRequestDecoder.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.codec;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.nifi.distributed.cache.operations.CacheOperation;
+import org.apache.nifi.distributed.cache.operations.MapOperation;
+import org.apache.nifi.distributed.cache.server.protocol.MapCacheRequest;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+/**
+ * Cache Request Decoder processes bytes and decodes cache version and operation requests
+ */
+public class MapCacheRequestDecoder extends CacheRequestDecoder {
+
+    public MapCacheRequestDecoder(
+            final ComponentLog log,
+            final int maxLength,
+            final CacheOperation[] supportedOperations
+    ) {
+        super(log, maxLength, supportedOperations);
+    }
+
+    @Override
+    protected Optional<Object> readRequest(final CacheOperation cacheOperation, final ByteBuf byteBuf) {
+        final MapCacheRequest request;
+
+        if (MapOperation.CONTAINS_KEY == cacheOperation) {
+            request = readKeyRequest(cacheOperation, byteBuf);
+        } else if (MapOperation.FETCH == cacheOperation) {
+            request = readKeyRequest(cacheOperation, byteBuf);
+        } else if (MapOperation.GET == cacheOperation) {
+            request = readKeyRequest(cacheOperation, byteBuf);
+        } else if (MapOperation.GET_AND_PUT_IF_ABSENT == cacheOperation) {
+            request = readKeyValueRequest(cacheOperation, byteBuf);
+        } else if (MapOperation.KEYSET == cacheOperation) {
+            request = new MapCacheRequest(cacheOperation);
+        } else if (MapOperation.REMOVE == cacheOperation) {
+            request = readKeyRequest(cacheOperation, byteBuf);
+        } else if (MapOperation.REMOVE_BY_PATTERN == cacheOperation) {
+            request = readPatternRequest(cacheOperation, byteBuf);
+        } else if (MapOperation.REMOVE_BY_PATTERN_AND_GET == cacheOperation) {
+            request = readPatternRequest(cacheOperation, byteBuf);
+        } else if (MapOperation.REMOVE_AND_GET == cacheOperation) {
+            request = readKeyRequest(cacheOperation, byteBuf);
+        } else if (MapOperation.REPLACE == cacheOperation) {
+            request = readKeyRevisionValueRequest(cacheOperation, byteBuf);
+        } else if (MapOperation.SUBMAP == cacheOperation) {
+            request = readSubMapRequest(cacheOperation, byteBuf);
+        } else if (MapOperation.PUT == cacheOperation) {
+            request = readKeyValueRequest(cacheOperation, byteBuf);
+        } else if (MapOperation.PUT_IF_ABSENT == cacheOperation) {
+            request = readKeyValueRequest(cacheOperation, byteBuf);
+        } else {
+            request = new MapCacheRequest(cacheOperation);
+        }
+
+        return Optional.ofNullable(request);
+    }
+
+    private MapCacheRequest readKeyRequest(final CacheOperation cacheOperation, final ByteBuf byteBuf) {
+        final Optional<byte[]> key = readBytes(byteBuf);
+        return key.map(bytes -> new MapCacheRequest(cacheOperation, bytes)).orElse(null);
+    }
+
+    private MapCacheRequest readKeyValueRequest(final CacheOperation cacheOperation, final ByteBuf byteBuf) {
+        final MapCacheRequest mapCacheRequest;
+
+        final Optional<byte[]> key = readBytes(byteBuf);
+        if (key.isPresent()) {
+            final Optional<byte[]> value = readBytes(byteBuf);
+            mapCacheRequest = value.map(valueBytes -> new MapCacheRequest(cacheOperation, key.get(), valueBytes)).orElse(null);
+        } else {
+            mapCacheRequest = null;
+        }
+
+        return mapCacheRequest;
+    }
+
+    private MapCacheRequest readKeyRevisionValueRequest(final CacheOperation cacheOperation, final ByteBuf byteBuf) {
+        final MapCacheRequest mapCacheRequest;
+
+        final Optional<byte[]> key = readBytes(byteBuf);
+        if (key.isPresent()) {
+            final OptionalLong revision = readLong(byteBuf);
+            if (revision.isPresent()) {
+                final Optional<byte[]> value = readBytes(byteBuf);
+                mapCacheRequest = value.map(valueBytes -> new MapCacheRequest(cacheOperation, key.get(), revision.getAsLong(), valueBytes)).orElse(null);
+            } else {
+                mapCacheRequest = null;
+            }
+        } else {
+            mapCacheRequest = null;
+        }
+
+        return mapCacheRequest;
+    }
+
+    private MapCacheRequest readPatternRequest(final CacheOperation cacheOperation, final ByteBuf byteBuf) {
+        final Optional<String> pattern = readUnicodeString(byteBuf);
+        final Optional<MapCacheRequest> request = pattern.map(requestedPattern -> new MapCacheRequest(cacheOperation, requestedPattern));
+        return request.orElse(null);
+    }
+
+    private MapCacheRequest readSubMapRequest(final CacheOperation cacheOperation, final ByteBuf byteBuf) {
+        final MapCacheRequest mapCacheRequest;
+
+        final OptionalInt keys = readInt(byteBuf);
+        if (keys.isPresent()) {
+            final List<byte[]> subMapKeys = new ArrayList<>();
+            for (int i = 0; i < keys.getAsInt(); i++) {
+                final Optional<byte[]> key = readBytes(byteBuf);
+                if (key.isPresent()) {
+                    subMapKeys.add(key.get());
+                } else {
+                    // Clear Map to return null and retry on subsequent invocations
+                    subMapKeys.clear();
+                    break;
+                }
+
+            }
+
+            mapCacheRequest = subMapKeys.isEmpty() ? null : new MapCacheRequest(cacheOperation, subMapKeys);
+        } else {
+            mapCacheRequest = null;
+        }
+
+        return mapCacheRequest;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapCacheRequestHandler.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapCacheRequestHandler.java
new file mode 100644
index 0000000000..2674962d1d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapCacheRequestHandler.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.codec;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.nifi.distributed.cache.operations.CacheOperation;
+import org.apache.nifi.distributed.cache.operations.MapOperation;
+import org.apache.nifi.distributed.cache.server.map.MapCache;
+import org.apache.nifi.distributed.cache.server.map.MapCacheRecord;
+import org.apache.nifi.distributed.cache.server.map.MapPutResult;
+import org.apache.nifi.distributed.cache.server.protocol.CacheOperationResult;
+import org.apache.nifi.distributed.cache.server.protocol.MapCacheRequest;
+import org.apache.nifi.distributed.cache.server.protocol.MapRemoveResponse;
+import org.apache.nifi.distributed.cache.server.protocol.MapSizeResponse;
+import org.apache.nifi.distributed.cache.server.protocol.MapValueResponse;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Handler for Map Cache Request operations interacts with the Map Cache and writes Results
+ */
+@ChannelHandler.Sharable
+public class MapCacheRequestHandler extends SimpleChannelInboundHandler<MapCacheRequest> {
+    private static final long REVISION_NOT_FOUND = -1;
+
+    private final ComponentLog log;
+
+    private final MapCache mapCache;
+
+    public MapCacheRequestHandler(
+            final ComponentLog log,
+            final MapCache mapCache
+    ) {
+        this.log = Objects.requireNonNull(log, "Component Log required");
+        this.mapCache = Objects.requireNonNull(mapCache, "Map Cache required");
+    }
+
+    @Override
+    protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final MapCacheRequest mapCacheRequest) throws Exception {
+        final CacheOperation cacheOperation = mapCacheRequest.getCacheOperation();
+
+        if (MapOperation.CLOSE == cacheOperation) {
+            log.debug("Map Cache Operation [{}] received", cacheOperation);
+            channelHandlerContext.close();
+        } else if (MapOperation.CONTAINS_KEY == cacheOperation) {
+            final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey());
+            final boolean success = mapCache.containsKey(key);
+            writeResult(channelHandlerContext, cacheOperation, success);
+        } else if (MapOperation.GET == cacheOperation) {
+            final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey());
+            final ByteBuffer cached = mapCache.get(key);
+            writeBytes(channelHandlerContext, cacheOperation, cached);
+        } else if (MapOperation.GET_AND_PUT_IF_ABSENT == cacheOperation) {
+            final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey());
+            final ByteBuffer value = ByteBuffer.wrap(mapCacheRequest.getValue());
+            final MapPutResult result = mapCache.putIfAbsent(key, value);
+            final ByteBuffer cached = result.isSuccessful() ? null : result.getExisting().getValue();
+            writeBytes(channelHandlerContext, cacheOperation, cached);
+        } else if (MapOperation.FETCH == cacheOperation) {
+            final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey());
+            final MapCacheRecord mapCacheRecord = mapCache.fetch(key);
+            writeMapCacheRecord(channelHandlerContext, cacheOperation, mapCacheRecord);
+        } else if (MapOperation.KEYSET == cacheOperation) {
+            final Set<ByteBuffer> keySet = mapCache.keySet();
+            writeSize(channelHandlerContext, cacheOperation, keySet.size());
+            for (final ByteBuffer key : keySet) {
+                writeBytes(channelHandlerContext, cacheOperation, key);
+            }
+        } else if (MapOperation.PUT == cacheOperation) {
+            final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey());
+            final ByteBuffer value = ByteBuffer.wrap(mapCacheRequest.getValue());
+            final MapPutResult result = mapCache.put(key, value);
+            writeResult(channelHandlerContext, cacheOperation, result.isSuccessful());
+        } else if (MapOperation.PUT_IF_ABSENT == cacheOperation) {
+            final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey());
+            final ByteBuffer value = ByteBuffer.wrap(mapCacheRequest.getValue());
+            final MapPutResult result = mapCache.putIfAbsent(key, value);
+            writeResult(channelHandlerContext, cacheOperation, result.isSuccessful());
+        } else if (MapOperation.REMOVE == cacheOperation) {
+            final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey());
+            final ByteBuffer removed = mapCache.remove(key);
+            final boolean success = removed != null;
+            writeResult(channelHandlerContext, cacheOperation, success);
+        } else if (MapOperation.REMOVE_AND_GET == cacheOperation) {
+            final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey());
+            final ByteBuffer removed = mapCache.remove(key);
+            writeBytes(channelHandlerContext, cacheOperation, removed);
+        } else if (MapOperation.REMOVE_BY_PATTERN == cacheOperation) {
+            final String pattern = mapCacheRequest.getPattern();
+            final Map<ByteBuffer, ByteBuffer> removed = mapCache.removeByPattern(pattern);
+            final int size = removed == null ? 0 : removed.size();
+            writeRemoved(channelHandlerContext, cacheOperation, size);
+        } else if (MapOperation.REMOVE_BY_PATTERN_AND_GET == cacheOperation) {
+            final String pattern = mapCacheRequest.getPattern();
+            final Map<ByteBuffer, ByteBuffer> removed = mapCache.removeByPattern(pattern);
+            if  (removed == null) {
+                writeRemoved(channelHandlerContext, cacheOperation, 0);
+            } else {
+                writeSize(channelHandlerContext, cacheOperation, removed.size());
+                for (final Map.Entry<ByteBuffer, ByteBuffer> entry : removed.entrySet()) {
+                    writeBytes(channelHandlerContext, cacheOperation, entry.getKey());
+                    writeBytes(channelHandlerContext, cacheOperation, entry.getValue());
+                }
+            }
+        } else if (MapOperation.REPLACE == cacheOperation) {
+            final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey());
+            final ByteBuffer value = ByteBuffer.wrap(mapCacheRequest.getValue());
+            final MapCacheRecord mapCacheRecord = new MapCacheRecord(key, value, mapCacheRequest.getRevision());
+            final MapPutResult result = mapCache.replace(mapCacheRecord);
+            writeResult(channelHandlerContext, cacheOperation, result.isSuccessful());
+        } else if (MapOperation.SUBMAP == cacheOperation) {
+            final List<byte[]> keys = mapCacheRequest.getKeys();
+            for (final byte[] key : keys) {
+                final ByteBuffer requestedKey = ByteBuffer.wrap(key);
+                final ByteBuffer value = mapCache.get(requestedKey);
+                writeBytes(channelHandlerContext, cacheOperation, value);
+            }
+        } else {
+            log.warn("Map Cache Operation [{}] not supported", cacheOperation);
+        }
+    }
+
+    private void writeResult(final ChannelHandlerContext channelHandlerContext, final CacheOperation cacheOperation, final boolean success) {
+        log.debug("Map Cache Operation [{}] Success [{}]", cacheOperation, success);
+        final CacheOperationResult cacheOperationResult = new CacheOperationResult(success);
+        channelHandlerContext.writeAndFlush(cacheOperationResult);
+    }
+
+    private void writeRemoved(final ChannelHandlerContext channelHandlerContext, final CacheOperation cacheOperation, final long size) {
+        final MapRemoveResponse mapRemoveResponse = new MapRemoveResponse(size);
+        log.debug("Map Cache Operation [{}] Size [{}]", cacheOperation, size);
+        channelHandlerContext.writeAndFlush(mapRemoveResponse);
+    }
+
+    private void writeSize(final ChannelHandlerContext channelHandlerContext, final CacheOperation cacheOperation, final int size) {
+        final MapSizeResponse mapSizeResponse = new MapSizeResponse(size);
+        log.debug("Map Cache Operation [{}] Size [{}]", cacheOperation, size);
+        channelHandlerContext.writeAndFlush(mapSizeResponse);
+    }
+
+    private void writeBytes(final ChannelHandlerContext channelHandlerContext, final CacheOperation cacheOperation, final ByteBuffer buffer) {
+        final byte[] bytes = buffer == null ? null : buffer.array();
+        final int length = bytes == null ? 0 : bytes.length;
+        final MapValueResponse mapValueResponse = new MapValueResponse(length, bytes);
+        log.debug("Map Cache Operation [{}] Length [{}]", cacheOperation, length);
+        channelHandlerContext.writeAndFlush(mapValueResponse);
+    }
+
+    private void writeMapCacheRecord(final ChannelHandlerContext channelHandlerContext, final CacheOperation cacheOperation, final MapCacheRecord mapCacheRecord) {
+        final long revision = mapCacheRecord == null ? REVISION_NOT_FOUND : mapCacheRecord.getRevision();
+        final byte[] value = mapCacheRecord == null ? null : mapCacheRecord.getValue().array();
+        final int length = value == null ? 0 : value.length;
+        final MapValueResponse mapValueResponse = new MapValueResponse(length, value, revision);
+        log.debug("Map Cache Operation [{}] Length [{}]", cacheOperation, length);
+        channelHandlerContext.writeAndFlush(mapValueResponse);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapRemoveResponseEncoder.java
similarity index 54%
copy from nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
copy to nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapRemoveResponseEncoder.java
index ce6718f6f5..44d7ce26d9 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapRemoveResponseEncoder.java
@@ -14,24 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.distributed.cache.operations;
+package org.apache.nifi.distributed.cache.server.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.nifi.distributed.cache.server.protocol.MapRemoveResponse;
 
 /**
- * Represents a distributed set cache operation which may be invoked.
+ * Message Encoder for Map Remove Responses
  */
-public enum SetOperation {
-    ADD_IF_ABSENT("addIfAbsent"),
-    CONTAINS("contains"),
-    REMOVE("remove"),
-    CLOSE("close");
-
-    private final String operation;
-
-    SetOperation(final String operation) {
-        this.operation = operation;
-    }
-
-    public String value() {
-        return operation;
+@ChannelHandler.Sharable
+public class MapRemoveResponseEncoder extends MessageToByteEncoder<MapRemoveResponse> {
+    @Override
+    protected void encode(final ChannelHandlerContext channelHandlerContext, final MapRemoveResponse mapRemoveResponse, final ByteBuf byteBuf) {
+        byteBuf.writeLong(mapRemoveResponse.getSize());
     }
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapSizeResponseEncoder.java
similarity index 55%
copy from nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
copy to nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapSizeResponseEncoder.java
index ce6718f6f5..d6fa60dd9c 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapSizeResponseEncoder.java
@@ -14,24 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.distributed.cache.operations;
+package org.apache.nifi.distributed.cache.server.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.nifi.distributed.cache.server.protocol.MapSizeResponse;
 
 /**
- * Represents a distributed set cache operation which may be invoked.
+ * Message Encoder for Map Size Responses
  */
-public enum SetOperation {
-    ADD_IF_ABSENT("addIfAbsent"),
-    CONTAINS("contains"),
-    REMOVE("remove"),
-    CLOSE("close");
-
-    private final String operation;
-
-    SetOperation(final String operation) {
-        this.operation = operation;
-    }
-
-    public String value() {
-        return operation;
+@ChannelHandler.Sharable
+public class MapSizeResponseEncoder extends MessageToByteEncoder<MapSizeResponse> {
+    @Override
+    protected void encode(final ChannelHandlerContext channelHandlerContext, final MapSizeResponse mapSizeResponse, final ByteBuf byteBuf) {
+        byteBuf.writeInt(mapSizeResponse.getSize());
     }
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapValueResponseEncoder.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapValueResponseEncoder.java
new file mode 100644
index 0000000000..a0561a31bc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapValueResponseEncoder.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.nifi.distributed.cache.server.protocol.MapValueResponse;
+
+/**
+ * Message Encoder for Map Value Responses
+ */
+@ChannelHandler.Sharable
+public class MapValueResponseEncoder extends MessageToByteEncoder<MapValueResponse> {
+    @Override
+    protected void encode(final ChannelHandlerContext channelHandlerContext, final MapValueResponse mapValueResponse, final ByteBuf byteBuf) {
+        final Long revision = mapValueResponse.getRevision();
+        if (revision != null) {
+            byteBuf.writeLong(revision);
+        }
+        byteBuf.writeInt(mapValueResponse.getLength());
+
+        final byte[] value = mapValueResponse.getValue();
+        if (value != null) {
+            byteBuf.writeBytes(value);
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/SetCacheRequestHandler.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/SetCacheRequestHandler.java
new file mode 100644
index 0000000000..0b0987778d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/SetCacheRequestHandler.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.codec;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.nifi.distributed.cache.operations.CacheOperation;
+import org.apache.nifi.distributed.cache.operations.SetOperation;
+import org.apache.nifi.distributed.cache.server.protocol.CacheOperationResult;
+import org.apache.nifi.distributed.cache.server.protocol.CacheRequest;
+import org.apache.nifi.distributed.cache.server.set.SetCache;
+import org.apache.nifi.distributed.cache.server.set.SetCacheResult;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+/**
+ * Handler for Set Cache Request operations interacts with the Set Cache and writes Results
+ */
+@ChannelHandler.Sharable
+public class SetCacheRequestHandler extends SimpleChannelInboundHandler<CacheRequest> {
+    private final ComponentLog log;
+
+    private final SetCache setCache;
+
+    public SetCacheRequestHandler(
+            final ComponentLog log,
+            final SetCache setCache
+    ) {
+        this.log = Objects.requireNonNull(log, "Component Log required");
+        this.setCache = Objects.requireNonNull(setCache, "Set Cache required");
+    }
+
+    @Override
+    protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final CacheRequest cacheRequest) throws Exception {
+        final CacheOperation cacheOperation = cacheRequest.getCacheOperation();
+        final ByteBuffer body = ByteBuffer.wrap(cacheRequest.getBody());
+
+        final CacheOperationResult result;
+        if (SetOperation.ADD_IF_ABSENT == cacheOperation) {
+            result = getCacheOperationResult(setCache.addIfAbsent(body));
+        } else if (SetOperation.CONTAINS == cacheOperation) {
+            result = getCacheOperationResult(setCache.contains(body));
+        } else if (SetOperation.REMOVE == cacheOperation) {
+            result = getCacheOperationResult(setCache.remove(body));
+        } else {
+            result = null;
+        }
+
+        if (SetOperation.CLOSE == cacheOperation) {
+            log.debug("Set Cache Operation [{}] received", cacheOperation);
+            channelHandlerContext.close();
+        } else if (result == null) {
+            log.warn("Set Cache Operation [{}] not supported", cacheOperation);
+        } else {
+            log.debug("Set Cache Operation [{}] Success [{}]", cacheOperation, result.isSuccess());
+            channelHandlerContext.writeAndFlush(result);
+        }
+    }
+
+    private CacheOperationResult getCacheOperationResult(final SetCacheResult setCacheResult) {
+        return new CacheOperationResult(setCacheResult.getResult());
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
index 7987e1485c..8c3a83a485 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
@@ -75,10 +75,23 @@ public class DistributedMapCacheServer extends DistributedCacheServer {
         }
     }
 
-    protected MapCacheServer createMapCacheServer(
-            final int port, final int maxSize, final SSLContext sslContext, final EvictionPolicy evictionPolicy,
-            final File persistenceDir, final int maxReadSize) throws IOException {
-        return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir, maxReadSize);
+    protected CacheServer createMapCacheServer(
+            final int port,
+            final int maxSize,
+            final SSLContext sslContext,
+            final EvictionPolicy evictionPolicy,
+            final File persistenceDir,
+            final int maxReadSize
+    ) throws IOException {
+        return new StandardMapCacheServer(
+                getLogger(),
+                getIdentifier(),
+                sslContext,
+                port,
+                maxSize,
+                evictionPolicy,
+                persistenceDir,
+                maxReadSize
+        );
     }
-
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
deleted file mode 100644
index 8986183540..0000000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server.map;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.Set;
-
-import javax.net.ssl.SSLContext;
-
-import org.apache.nifi.distributed.cache.server.AbstractCacheServer;
-import org.apache.nifi.distributed.cache.server.EvictionPolicy;
-import org.apache.nifi.remote.StandardVersionNegotiator;
-import org.apache.nifi.remote.VersionNegotiator;
-
-public class MapCacheServer extends AbstractCacheServer {
-
-    private final MapCache cache;
-
-    public MapCacheServer(final String identifier, final SSLContext sslContext, final int port, final int maxSize,
-            final EvictionPolicy evictionPolicy, final File persistencePath, final int maxReadSize) throws IOException {
-        super(identifier, sslContext, port, maxReadSize);
-
-        final MapCache simpleCache = new SimpleMapCache(identifier, maxSize, evictionPolicy);
-
-        if (persistencePath == null) {
-            this.cache = simpleCache;
-        } else {
-            final PersistentMapCache persistentCache = new PersistentMapCache(identifier, persistencePath, simpleCache);
-            persistentCache.restore();
-            this.cache = persistentCache;
-        }
-    }
-
-    /**
-     * Refer {@link org.apache.nifi.distributed.cache.protocol.ProtocolHandshake#initiateHandshake(InputStream, OutputStream, VersionNegotiator)}
-     * for details of each version enhancements.
-     */
-    protected StandardVersionNegotiator getVersionNegotiator() {
-        return new StandardVersionNegotiator(3, 2, 1);
-    }
-
-    @Override
-    protected boolean listen(final InputStream in, final OutputStream out, final int version) throws IOException {
-        final DataInputStream dis = new DataInputStream(in);
-        final DataOutputStream dos = new DataOutputStream(out);
-        final String action = dis.readUTF();
-        try {
-            switch (action) {
-            case "close": {
-                return false;
-            }
-            case "putIfAbsent": {
-                final byte[] key = readValue(dis);
-                final byte[] value = readValue(dis);
-                final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
-                dos.writeBoolean(putResult.isSuccessful());
-                break;
-            }
-            case "put": {
-                final byte[] key = readValue(dis);
-                final byte[] value = readValue(dis);
-                cache.put(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
-                dos.writeBoolean(true);
-                break;
-            }
-            case "containsKey": {
-                final byte[] key = readValue(dis);
-                final boolean contains = cache.containsKey(ByteBuffer.wrap(key));
-                dos.writeBoolean(contains);
-                break;
-            }
-            case "getAndPutIfAbsent": {
-                final byte[] key = readValue(dis);
-                final byte[] value = readValue(dis);
-
-                final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
-                if (putResult.isSuccessful()) {
-                    // Put was successful. There was no old value to get.
-                    dos.writeInt(0);
-                } else {
-                    // we didn't put. Write back the previous value
-                    final byte[] byteArray = putResult.getExisting().getValue().array();
-                    dos.writeInt(byteArray.length);
-                    dos.write(byteArray);
-                }
-
-                break;
-            }
-            case "get": {
-                final byte[] key = readValue(dis);
-                final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key));
-                if (existingValue == null) {
-                    // there was no existing value.
-                    dos.writeInt(0);
-                } else {
-                    // a value already existed.
-                    final byte[] byteArray = existingValue.array();
-                    dos.writeInt(byteArray.length);
-                    dos.write(byteArray);
-                }
-
-                break;
-            }
-            case "subMap": {
-                final int numKeys = validateSize(dis.readInt());
-                for(int i=0;i<numKeys;i++) {
-                    final byte[] key = readValue(dis);
-                    final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key));
-                    if (existingValue == null) {
-                        // there was no existing value.
-                        dos.writeInt(0);
-                    } else {
-                        // a value already existed.
-                        final byte[] byteArray = existingValue.array();
-                        dos.writeInt(byteArray.length);
-                        dos.write(byteArray);
-                    }
-                }
-                break;
-            }
-            case "remove": {
-                final byte[] key = readValue(dis);
-                final boolean removed = cache.remove(ByteBuffer.wrap(key)) != null;
-                dos.writeBoolean(removed);
-                break;
-            }
-            case "removeAndGet": {
-                final byte[] key = readValue(dis);
-                final ByteBuffer removed = cache.remove(ByteBuffer.wrap(key));
-                if (removed == null) {
-                    // there was no value removed
-                    dos.writeInt(0);
-                } else {
-                    // reply with the value that was removed
-                    final byte[] byteArray = removed.array();
-                    dos.writeInt(byteArray.length);
-                    dos.write(byteArray);
-                }
-                break;
-            }
-            case "removeByPattern": {
-                final String pattern = dis.readUTF();
-                final Map<ByteBuffer, ByteBuffer> removed = cache.removeByPattern(pattern);
-                dos.writeLong(removed == null ? 0 : removed.size());
-                break;
-            }
-            case "removeByPatternAndGet": {
-                final String pattern = dis.readUTF();
-                final Map<ByteBuffer, ByteBuffer> removed = cache.removeByPattern(pattern);
-                if (removed == null || removed.size() == 0) {
-                    dos.writeLong(0);
-                } else {
-                    // write the map size
-                    dos.writeInt(removed.size());
-                    for (Map.Entry<ByteBuffer, ByteBuffer> entry : removed.entrySet()) {
-                        // write map entry key
-                        final byte[] key = entry.getKey().array();
-                        dos.writeInt(key.length);
-                        dos.write(key);
-                        // write map entry value
-                        final byte[] value = entry.getValue().array();
-                        dos.writeInt(value.length);
-                        dos.write(value);
-                    }
-                }
-                break;
-            }
-            case "fetch": {
-                final byte[] key = readValue(dis);
-                final MapCacheRecord existing = cache.fetch(ByteBuffer.wrap(key));
-                if (existing == null) {
-                    // there was no existing value.
-                    dos.writeLong(-1);
-                    dos.writeInt(0);
-                } else {
-                    // a value already existed.
-                    dos.writeLong(existing.getRevision());
-                    final byte[] byteArray = existing.getValue().array();
-                    dos.writeInt(byteArray.length);
-                    dos.write(byteArray);
-                }
-
-                break;
-            }
-            case "replace": {
-                final byte[] key = readValue(dis);
-                final long revision = dis.readLong();
-                final byte[] value = readValue(dis);
-                final MapPutResult result = cache.replace(new MapCacheRecord(ByteBuffer.wrap(key), ByteBuffer.wrap(value), revision));
-                dos.writeBoolean(result.isSuccessful());
-                break;
-            }
-            case "keySet": {
-                final Set<ByteBuffer> result = cache.keySet();
-                // write the set size
-                dos.writeInt(result.size());
-                // write each key in the set
-                for (ByteBuffer bb : result) {
-                    final byte[] byteArray = bb.array();
-                    dos.writeInt(byteArray.length);
-                    dos.write(byteArray);
-                }
-                break;
-            }
-            default: {
-                throw new IOException("Illegal Request");
-            }
-            }
-        } finally {
-            dos.flush();
-        }
-
-        return true;
-    }
-
-    @Override
-    public void stop() throws IOException {
-        try {
-            super.stop();
-        } finally {
-            cache.shutdown();
-        }
-    }
-
-    @Override
-    protected void finalize() throws Throwable {
-        if (!stopped) {
-            stop();
-        }
-    }
-}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/StandardMapCacheServer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/StandardMapCacheServer.java
new file mode 100644
index 0000000000..f5e57fd5fe
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/StandardMapCacheServer.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.map;
+
+import org.apache.nifi.distributed.cache.operations.MapOperation;
+import org.apache.nifi.distributed.cache.protocol.ProtocolVersion;
+import org.apache.nifi.distributed.cache.server.EventCacheServer;
+import org.apache.nifi.distributed.cache.server.EvictionPolicy;
+import org.apache.nifi.distributed.cache.server.codec.CacheVersionRequestHandler;
+import org.apache.nifi.distributed.cache.server.codec.CacheVersionResponseEncoder;
+import org.apache.nifi.distributed.cache.server.codec.CacheOperationResultEncoder;
+import org.apache.nifi.distributed.cache.server.codec.MapCacheRequestDecoder;
+import org.apache.nifi.distributed.cache.server.codec.MapCacheRequestHandler;
+import org.apache.nifi.distributed.cache.server.codec.MapRemoveResponseEncoder;
+import org.apache.nifi.distributed.cache.server.codec.MapSizeResponseEncoder;
+import org.apache.nifi.distributed.cache.server.codec.MapValueResponseEncoder;
+import org.apache.nifi.event.transport.EventServer;
+import org.apache.nifi.event.transport.EventServerFactory;
+import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.VersionNegotiator;
+
+import javax.net.ssl.SSLContext;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+
+/**
+ * Standard Map Cache Server implemented using Netty
+ */
+public class StandardMapCacheServer extends EventCacheServer {
+    private final EventServerFactory eventServerFactory;
+
+    private final MapCache cache;
+
+    public StandardMapCacheServer(
+            final ComponentLog log,
+            final String identifier,
+            final SSLContext sslContext,
+            final int port,
+            final int maxCacheEntries,
+            final EvictionPolicy evictionPolicy,
+            final File persistencePath,
+            final int maxReadLength
+    ) throws IOException {
+        super(log, port);
+
+        final MapCache simpleCache = new SimpleMapCache(identifier, maxCacheEntries, evictionPolicy);
+
+        if (persistencePath == null) {
+            this.cache = simpleCache;
+        } else {
+            final PersistentMapCache persistentCache = new PersistentMapCache(identifier, persistencePath, simpleCache);
+            persistentCache.restore();
+            this.cache = persistentCache;
+        }
+
+        final NettyEventServerFactory nettyEventServerFactory = createEventServerFactory(identifier, sslContext);
+
+        // Create Sharable Handlers to avoid unnecessary instantiation for each connection
+        final MapCacheRequestHandler mapCacheRequestHandler = new MapCacheRequestHandler(log, cache);
+        final CacheVersionResponseEncoder cacheVersionResponseEncoder = new CacheVersionResponseEncoder();
+        final CacheOperationResultEncoder cacheOperationResultEncoder = new CacheOperationResultEncoder();
+        final MapRemoveResponseEncoder mapRemoveResponseEncoder = new MapRemoveResponseEncoder();
+        final MapSizeResponseEncoder mapSizeResponseEncoder = new MapSizeResponseEncoder();
+        final MapValueResponseEncoder mapValueResponseEncoder = new MapValueResponseEncoder();
+
+        final VersionNegotiator versionNegotiator = createVersionNegotiator();
+        nettyEventServerFactory.setHandlerSupplier(() ->
+                Arrays.asList(
+                        cacheVersionResponseEncoder,
+                        cacheOperationResultEncoder,
+                        mapRemoveResponseEncoder,
+                        mapSizeResponseEncoder,
+                        mapValueResponseEncoder,
+                        new MapCacheRequestDecoder(log, maxReadLength, MapOperation.values()),
+                        mapCacheRequestHandler,
+                        new CacheVersionRequestHandler(log, versionNegotiator)
+                )
+        );
+
+        this.eventServerFactory = nettyEventServerFactory;
+    }
+
+    @Override
+    public void stop() {
+        try {
+            cache.shutdown();
+        } catch (final IOException e) {
+            throw new UncheckedIOException("Cache Shutdown Failed", e);
+        } finally {
+            super.stop();
+        }
+    }
+
+    @Override
+    protected EventServer createEventServer() {
+        return eventServerFactory.getEventServer();
+    }
+
+    protected VersionNegotiator createVersionNegotiator() {
+        return new StandardVersionNegotiator(
+                ProtocolVersion.V3.value(),
+                ProtocolVersion.V2.value(),
+                ProtocolVersion.V1.value()
+        );
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/CacheOperationResult.java
similarity index 65%
copy from nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
copy to nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/CacheOperationResult.java
index ce6718f6f5..770deed4f0 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/CacheOperationResult.java
@@ -14,24 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.distributed.cache.operations;
+package org.apache.nifi.distributed.cache.server.protocol;
 
-/**
- * Represents a distributed set cache operation which may be invoked.
- */
-public enum SetOperation {
-    ADD_IF_ABSENT("addIfAbsent"),
-    CONTAINS("contains"),
-    REMOVE("remove"),
-    CLOSE("close");
-
-    private final String operation;
+public class CacheOperationResult {
+    private final boolean success;
 
-    SetOperation(final String operation) {
-        this.operation = operation;
+    public CacheOperationResult(
+            final boolean success
+    ) {
+        this.success = success;
     }
 
-    public String value() {
-        return operation;
+    public boolean isSuccess() {
+        return success;
     }
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/CacheRequest.java
similarity index 54%
copy from nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
copy to nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/CacheRequest.java
index ce6718f6f5..9bd6793822 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/CacheRequest.java
@@ -14,24 +14,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.distributed.cache.operations;
+package org.apache.nifi.distributed.cache.server.protocol;
+
+import org.apache.nifi.distributed.cache.operations.CacheOperation;
+
+import java.util.Objects;
 
 /**
- * Represents a distributed set cache operation which may be invoked.
+ * Cache Request Packet
  */
-public enum SetOperation {
-    ADD_IF_ABSENT("addIfAbsent"),
-    CONTAINS("contains"),
-    REMOVE("remove"),
-    CLOSE("close");
+public class CacheRequest {
+    private final CacheOperation cacheOperation;
 
-    private final String operation;
+    private final byte[] body;
+
+    public CacheRequest(
+            final CacheOperation cacheOperation,
+            final byte[] body
+    ) {
+        this.cacheOperation = Objects.requireNonNull(cacheOperation, "Cache Operation required");
+        this.body = Objects.requireNonNull(body, "Body required");
+    }
 
-    SetOperation(final String operation) {
-        this.operation = operation;
+    public CacheOperation getCacheOperation() {
+        return cacheOperation;
     }
 
-    public String value() {
-        return operation;
+    public byte[] getBody() {
+        return body;
     }
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/CacheVersionRequest.java
similarity index 66%
copy from nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
copy to nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/CacheVersionRequest.java
index ce6718f6f5..9c69b573d0 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/CacheVersionRequest.java
@@ -14,24 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.distributed.cache.operations;
+package org.apache.nifi.distributed.cache.server.protocol;
 
 /**
- * Represents a distributed set cache operation which may be invoked.
+ * Cache Version Request contains the protocol version which the peer requested
  */
-public enum SetOperation {
-    ADD_IF_ABSENT("addIfAbsent"),
-    CONTAINS("contains"),
-    REMOVE("remove"),
-    CLOSE("close");
+public class CacheVersionRequest {
+    private final int version;
 
-    private final String operation;
-
-    SetOperation(final String operation) {
-        this.operation = operation;
+    public CacheVersionRequest(
+            final int version
+    ) {
+        this.version = version;
     }
 
-    public String value() {
-        return operation;
+    public int getVersion() {
+        return version;
     }
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/CacheVersionResponse.java
similarity index 60%
copy from nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
copy to nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/CacheVersionResponse.java
index ce6718f6f5..724803c203 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/CacheVersionResponse.java
@@ -14,24 +14,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.distributed.cache.operations;
+package org.apache.nifi.distributed.cache.server.protocol;
 
 /**
- * Represents a distributed set cache operation which may be invoked.
+ * Cache Version Response contains the status code and optional requested version
  */
-public enum SetOperation {
-    ADD_IF_ABSENT("addIfAbsent"),
-    CONTAINS("contains"),
-    REMOVE("remove"),
-    CLOSE("close");
+public class CacheVersionResponse {
+    private final int statusCode;
 
-    private final String operation;
+    private final int version;
 
-    SetOperation(final String operation) {
-        this.operation = operation;
+    public CacheVersionResponse(
+            final int statusCode,
+            final int version
+    ) {
+        this.statusCode = statusCode;
+        this.version = version;
     }
 
-    public String value() {
-        return operation;
+    public int getStatusCode() {
+        return statusCode;
+    }
+
+    public int getVersion() {
+        return version;
     }
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/MapCacheRequest.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/MapCacheRequest.java
new file mode 100644
index 0000000000..0f6fe97543
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/MapCacheRequest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.protocol;
+
+import org.apache.nifi.distributed.cache.operations.CacheOperation;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Map Cache Request with operation and other optional properties
+ */
+public class MapCacheRequest {
+    private final CacheOperation cacheOperation;
+
+    private byte[] key;
+
+    private byte[] value;
+
+    private String pattern;
+
+    private long revision;
+
+    private List<byte[]> keys = Collections.emptyList();
+
+    public MapCacheRequest(
+            final CacheOperation cacheOperation
+    ) {
+        this.cacheOperation = Objects.requireNonNull(cacheOperation, "Cache Operation required");
+    }
+
+    public MapCacheRequest(
+            final CacheOperation cacheOperation,
+            final byte[] key
+    ) {
+        this(cacheOperation);
+        this.key = Objects.requireNonNull(key, "Key required");
+    }
+
+    public MapCacheRequest(
+            final CacheOperation cacheOperation,
+            final byte[] key,
+            final byte[] value
+    ) {
+        this(cacheOperation, key);
+        this.value = Objects.requireNonNull(value, "Value required");
+    }
+
+    public MapCacheRequest(
+            final CacheOperation cacheOperation,
+            final byte[] key,
+            final long revision,
+            final byte[] value
+    ) {
+        this(cacheOperation, key, value);
+        this.revision = revision;
+    }
+
+    public MapCacheRequest(
+            final CacheOperation cacheOperation,
+            final String pattern
+    ) {
+        this(cacheOperation);
+        this.pattern = Objects.requireNonNull(pattern, "Pattern required");
+    }
+
+    public MapCacheRequest(
+            final CacheOperation cacheOperation,
+            final List<byte[]> keys
+    ) {
+        this(cacheOperation);
+        this.keys = Objects.requireNonNull(keys, "Keys required");
+    }
+
+    public CacheOperation getCacheOperation() {
+        return cacheOperation;
+    }
+
+    public byte[] getKey() {
+        return key;
+    }
+
+    public byte[] getValue() {
+        return value;
+    }
+
+    public String getPattern() {
+        return pattern;
+    }
+
+    public long getRevision() {
+        return revision;
+    }
+
+    public List<byte[]> getKeys() {
+        return keys;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/MapRemoveResponse.java
similarity index 66%
copy from nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
copy to nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/MapRemoveResponse.java
index ce6718f6f5..f5a265cb18 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/MapRemoveResponse.java
@@ -14,24 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.distributed.cache.operations;
+package org.apache.nifi.distributed.cache.server.protocol;
 
 /**
- * Represents a distributed set cache operation which may be invoked.
+ * Map Remove Response
  */
-public enum SetOperation {
-    ADD_IF_ABSENT("addIfAbsent"),
-    CONTAINS("contains"),
-    REMOVE("remove"),
-    CLOSE("close");
+public class MapRemoveResponse {
+    private final long size;
 
-    private final String operation;
-
-    SetOperation(final String operation) {
-        this.operation = operation;
+    public MapRemoveResponse(
+            final long size
+    ) {
+        this.size = size;
     }
 
-    public String value() {
-        return operation;
+    public long getSize() {
+        return size;
     }
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/MapSizeResponse.java
similarity index 66%
copy from nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
copy to nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/MapSizeResponse.java
index ce6718f6f5..9c124dc7a4 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/MapSizeResponse.java
@@ -14,24 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.distributed.cache.operations;
+package org.apache.nifi.distributed.cache.server.protocol;
 
 /**
- * Represents a distributed set cache operation which may be invoked.
+ * Map Size Response
  */
-public enum SetOperation {
-    ADD_IF_ABSENT("addIfAbsent"),
-    CONTAINS("contains"),
-    REMOVE("remove"),
-    CLOSE("close");
+public class MapSizeResponse {
+    private final int size;
 
-    private final String operation;
-
-    SetOperation(final String operation) {
-        this.operation = operation;
+    public MapSizeResponse(
+            final int size
+    ) {
+        this.size = size;
     }
 
-    public String value() {
-        return operation;
+    public int getSize() {
+        return size;
     }
 }
diff --git a/nifi-commons/nifi-security-socket-ssl/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/MapValueResponse.java
similarity index 50%
rename from nifi-commons/nifi-security-socket-ssl/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
rename to nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/MapValueResponse.java
index 262cf54f61..6b52908f78 100644
--- a/nifi-commons/nifi-security-socket-ssl/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/MapValueResponse.java
@@ -14,39 +14,44 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.remote.io.socket.ssl;
+package org.apache.nifi.distributed.cache.server.protocol;
 
-import java.io.IOException;
-import java.io.OutputStream;
+/**
+ * Map Value Response
+ */
+public class MapValueResponse {
+    private final int length;
 
-public class SSLSocketChannelOutputStream extends OutputStream {
+    private final byte[] value;
 
-    private final SSLSocketChannel channel;
+    private Long revision;
 
-    public SSLSocketChannelOutputStream(final SSLSocketChannel channel) {
-        this.channel = channel;
+    public MapValueResponse(
+            final int length,
+            final byte[] value
+    ) {
+        this.length = length;
+        this.value = value;
     }
 
-    @Override
-    public void write(final int b) throws IOException {
-        channel.write(b);
+    public MapValueResponse(
+            final int length,
+            final byte[] value,
+            final Long revision
+    ) {
+        this(length, value);
+        this.revision = revision;
     }
 
-    @Override
-    public void write(byte[] b) throws IOException {
-        channel.write(b);
+    public int getLength() {
+        return length;
     }
 
-    @Override
-    public void write(byte[] b, int off, int len) throws IOException {
-        channel.write(b, off, len);
+    public byte[] getValue() {
+        return value;
     }
 
-    /**
-     * Closes the underlying SSLSocketChannel, which also will close the InputStream and the connection
-     */
-    @Override
-    public void close() throws IOException {
-        channel.close();
+    public Long getRevision() {
+        return revision;
     }
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/StandardSetCacheServer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/StandardSetCacheServer.java
new file mode 100644
index 0000000000..d11b941f60
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/StandardSetCacheServer.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.set;
+
+import org.apache.nifi.distributed.cache.operations.SetOperation;
+import org.apache.nifi.distributed.cache.protocol.ProtocolVersion;
+import org.apache.nifi.distributed.cache.server.EventCacheServer;
+import org.apache.nifi.distributed.cache.server.EvictionPolicy;
+import org.apache.nifi.distributed.cache.server.codec.CacheOperationResultEncoder;
+import org.apache.nifi.distributed.cache.server.codec.CacheRequestDecoder;
+import org.apache.nifi.distributed.cache.server.codec.CacheVersionRequestHandler;
+import org.apache.nifi.distributed.cache.server.codec.CacheVersionResponseEncoder;
+import org.apache.nifi.distributed.cache.server.codec.SetCacheRequestHandler;
+import org.apache.nifi.event.transport.EventServer;
+import org.apache.nifi.event.transport.EventServerFactory;
+import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.VersionNegotiator;
+
+import javax.net.ssl.SSLContext;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+
+/**
+ * Standard Set Cache Server implementation based on Netty
+ */
+public class StandardSetCacheServer extends EventCacheServer {
+    private final EventServerFactory eventServerFactory;
+
+    private final SetCache cache;
+
+    public StandardSetCacheServer(
+            final ComponentLog log,
+            final String identifier,
+            final SSLContext sslContext,
+            final int port,
+            final int maxCacheEntries,
+            final EvictionPolicy evictionPolicy,
+            final File persistencePath,
+            final int maxReadLength
+    ) throws IOException {
+        super(log, port);
+
+        final SetCache simpleCache = new SimpleSetCache(identifier, maxCacheEntries, evictionPolicy);
+
+        if (persistencePath == null) {
+            this.cache = simpleCache;
+        } else {
+            final PersistentSetCache persistentCache = new PersistentSetCache(identifier, persistencePath, simpleCache);
+            persistentCache.restore();
+            this.cache = persistentCache;
+        }
+
+        final NettyEventServerFactory nettyEventServerFactory = createEventServerFactory(identifier, sslContext);
+
+        // Create Sharable Handlers to avoid unnecessary instantiation for each connection
+        final SetCacheRequestHandler setCacheRequestHandler = new SetCacheRequestHandler(log, cache);
+        final CacheVersionResponseEncoder cacheVersionResponseEncoder = new CacheVersionResponseEncoder();
+        final CacheOperationResultEncoder cacheOperationResultEncoder = new CacheOperationResultEncoder();
+
+        final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(ProtocolVersion.V1.value());
+        nettyEventServerFactory.setHandlerSupplier(() ->
+                Arrays.asList(
+                        cacheVersionResponseEncoder,
+                        cacheOperationResultEncoder,
+                        new CacheRequestDecoder(log, maxReadLength, SetOperation.values()),
+                        setCacheRequestHandler,
+                        new CacheVersionRequestHandler(log, versionNegotiator)
+                )
+        );
+
+        this.eventServerFactory = nettyEventServerFactory;
+    }
+
+    @Override
+    public void stop() {
+        try {
+            cache.shutdown();
+        } catch (final IOException e) {
+            throw new UncheckedIOException("Cache Shutdown Failed", e);
+        } finally {
+            super.stop();
+        }
+    }
+
+    @Override
+    protected EventServer createEventServer() {
+        return eventServerFactory.getEventServer();
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTest.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTest.java
index 42e9f568dc..d292159425 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTest.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTest.java
@@ -26,9 +26,10 @@ import org.apache.nifi.processor.Processor;
 import org.apache.nifi.remote.io.socket.NetworkUtils;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 import org.mockito.Mockito;
 
 import java.io.IOException;
@@ -39,18 +40,12 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Verify basic functionality of {@link DistributedMapCacheClientService}.
- * <p>
- * This test instantiates both the server and client {@link org.apache.nifi.controller.ControllerService} objects
- * implementing the distributed cache protocol.  It assumes that the default distributed cache port (4557)
- * is available.
- */
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Timeout(5)
 public class DistributedMapCacheTest {
 
     private static TestRunner runner = null;
@@ -59,8 +54,8 @@ public class DistributedMapCacheTest {
     private static final Serializer<String> serializer = new StringSerializer();
     private static final Deserializer<String> deserializer = new StringDeserializer();
 
-    @BeforeClass
-    public static void beforeClass() throws Exception {
+    @BeforeAll
+    public static void startServices() throws Exception {
         final String port = Integer.toString(NetworkUtils.getAvailableTcpPort());
         runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
 
@@ -76,8 +71,8 @@ public class DistributedMapCacheTest {
         runner.enableControllerService(client);
     }
 
-    @AfterClass
-    public static void afterClass() {
+    @AfterAll
+    public static void shutdownServices() {
         runner.disableControllerService(client);
         runner.removeControllerService(client);
 
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTlsTest.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTlsTest.java
index 9fddfaf97b..654468f817 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTlsTest.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTlsTest.java
@@ -29,9 +29,9 @@ import org.apache.nifi.security.util.TlsConfiguration;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
 import javax.net.ssl.SSLContext;
@@ -40,18 +40,10 @@ import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.security.GeneralSecurityException;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
-/**
- * Verify basic functionality of {@link DistributedMapCacheClientService}, in the context of a TLS authenticated
- * socket session.
- * <p>
- * This test instantiates both the server and client {@link org.apache.nifi.controller.ControllerService} objects
- * implementing the distributed cache protocol.  It assumes that the default distributed cache port (4557)
- * is available.
- */
 public class DistributedMapCacheTlsTest {
 
     private static TestRunner runner = null;
@@ -61,8 +53,8 @@ public class DistributedMapCacheTlsTest {
     private static final Serializer<String> serializer = new StringSerializer();
     private static final Deserializer<String> deserializer = new StringDeserializer();
 
-    @BeforeClass
-    public static void beforeClass() throws Exception {
+    @BeforeAll
+    public static void setServices() throws Exception {
         final String port = Integer.toString(NetworkUtils.getAvailableTcpPort());
         runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
         sslContextService = createSslContextService();
@@ -83,8 +75,8 @@ public class DistributedMapCacheTlsTest {
         runner.enableControllerService(client);
     }
 
-    @AfterClass
-    public static void afterClass() {
+    @AfterAll
+    public static void shutdown() {
         runner.disableControllerService(client);
         runner.removeControllerService(client);
 
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/StandardMapCacheServerTest.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/StandardMapCacheServerTest.java
new file mode 100644
index 0000000000..12b6f0e51e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/StandardMapCacheServerTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.map;
+
+import org.apache.nifi.distributed.cache.operations.MapOperation;
+import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
+import org.apache.nifi.distributed.cache.protocol.ProtocolVersion;
+import org.apache.nifi.distributed.cache.server.EvictionPolicy;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import javax.net.ssl.SSLContext;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.security.SecureRandom;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Timeout(5)
+@ExtendWith(MockitoExtension.class)
+class StandardMapCacheServerTest {
+    private static final String IDENTIFIER = StandardMapCacheServer.class.getSimpleName();
+
+    private static final SSLContext SSL_CONTEXT_DISABLED = null;
+
+    private static final int MAX_CACHE_ENTRIES = 32;
+
+    private static final EvictionPolicy EVICTION_POLICY = EvictionPolicy.FIFO;
+
+    private static final File PERSISTENCE_PATH_DISABLED = null;
+
+    private static final int MAX_READ_LENGTH = 4096;
+
+    private static final String LOCALHOST = "127.0.0.1";
+
+    private static final byte[] HEADER = new byte[]{'N', 'i', 'F', 'i'};
+
+    private static final byte[] KEY = String.class.getSimpleName().getBytes(StandardCharsets.UTF_8);
+
+    private static final int KEY_NOT_FOUND = 0;
+
+    private static final int PUT_COMPLETED = 1;
+
+    @Mock
+    ComponentLog log;
+
+    StandardMapCacheServer server;
+
+    @BeforeEach
+    void setServer() throws IOException {
+        final int port = NetworkUtils.getAvailableTcpPort();
+        server = new StandardMapCacheServer(
+                log,
+                IDENTIFIER,
+                SSL_CONTEXT_DISABLED,
+                port,
+                MAX_CACHE_ENTRIES,
+                EVICTION_POLICY,
+                PERSISTENCE_PATH_DISABLED,
+                MAX_READ_LENGTH
+        );
+        server.start();
+    }
+
+    @AfterEach
+    void stopServer() {
+        server.stop();
+    }
+
+    @Test
+    void testSocketContainsKeyValueDelayed() throws IOException, InterruptedException {
+        try (
+                final Socket socket = new Socket(LOCALHOST, server.getPort());
+                final InputStream inputStream = socket.getInputStream();
+                final OutputStream outputStream = socket.getOutputStream();
+                final DataOutputStream dataOutputStream = new DataOutputStream(outputStream)
+                ) {
+            sendHeaderVersion(dataOutputStream, inputStream);
+
+            dataOutputStream.writeUTF(MapOperation.CONTAINS_KEY.value());
+
+            // Delay writing key to simulate slow network connection
+            TimeUnit.MILLISECONDS.sleep(200);
+
+            dataOutputStream.writeInt(KEY.length);
+            dataOutputStream.write(KEY);
+
+            final int read = inputStream.read();
+            assertEquals(KEY_NOT_FOUND, read);
+        }
+    }
+
+    @Test
+    void testSocketPutGetMaxLength() throws IOException {
+        try (
+                final Socket socket = new Socket(LOCALHOST, server.getPort());
+                final InputStream inputStream = socket.getInputStream();
+                final DataInputStream dataInputStream = new DataInputStream(inputStream);
+                final OutputStream outputStream = socket.getOutputStream();
+                final DataOutputStream dataOutputStream = new DataOutputStream(outputStream)
+        ) {
+            sendHeaderVersion(dataOutputStream, inputStream);
+
+            dataOutputStream.writeUTF(MapOperation.PUT.value());
+
+            dataOutputStream.writeInt(KEY.length);
+            dataOutputStream.write(KEY);
+
+            final byte[] value = getValue();
+            dataOutputStream.writeInt(value.length);
+            dataOutputStream.write(value);
+
+            final int putStatus = inputStream.read();
+            assertEquals(PUT_COMPLETED, putStatus);
+
+            dataOutputStream.writeUTF(MapOperation.GET.value());
+
+            dataOutputStream.writeInt(KEY.length);
+            dataOutputStream.write(KEY);
+
+            final int valueLength = dataInputStream.readInt();
+            assertEquals(MAX_READ_LENGTH, valueLength);
+
+            final byte[] cachedValue = new byte[valueLength];
+            final int cachedValueLength = dataInputStream.read(cachedValue);
+            assertEquals(MAX_READ_LENGTH, cachedValueLength);
+
+            assertArrayEquals(value, cachedValue);
+        }
+    }
+
+    private void sendHeaderVersion(final DataOutputStream dataOutputStream, final InputStream inputStream) throws IOException {
+        dataOutputStream.write(HEADER);
+        dataOutputStream.writeInt(ProtocolVersion.V3.value());
+
+        final int protocolResponse = inputStream.read();
+        assertEquals(ProtocolHandshake.RESOURCE_OK, protocolResponse);
+    }
+
+    private byte[] getValue() {
+        final SecureRandom secureRandom = new SecureRandom();
+        final byte[] value = new byte[MAX_READ_LENGTH];
+        secureRandom.nextBytes(value);
+        return value;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java
index dc2d0ab3ac..d32946e83c 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java
@@ -25,6 +25,8 @@ import org.apache.nifi.distributed.cache.client.Deserializer;
 import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
 import org.apache.nifi.distributed.cache.client.Serializer;
 import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
+import org.apache.nifi.distributed.cache.protocol.ProtocolVersion;
+import org.apache.nifi.distributed.cache.server.CacheServer;
 import org.apache.nifi.distributed.cache.server.DistributedCacheServer;
 import org.apache.nifi.distributed.cache.server.EvictionPolicy;
 import org.apache.nifi.processor.DataUnit;
@@ -224,11 +226,11 @@ public class TestDistributedMapServerAndClient {
         // Create a server that only supports protocol version 1.
         server = new DistributedMapCacheServer() {
             @Override
-            protected MapCacheServer createMapCacheServer(int port, int maxSize, SSLContext sslContext, EvictionPolicy evictionPolicy, File persistenceDir, int maxReadSize) throws IOException {
-                return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir, maxReadSize) {
+            protected CacheServer createMapCacheServer(int port, int maxSize, SSLContext sslContext, EvictionPolicy evictionPolicy, File persistenceDir, int maxReadSize) throws IOException {
+                return new StandardMapCacheServer(getLogger(), getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir, maxReadSize) {
                     @Override
-                    protected StandardVersionNegotiator getVersionNegotiator() {
-                        return new StandardVersionNegotiator(1);
+                    protected StandardVersionNegotiator createVersionNegotiator() {
+                        return new StandardVersionNegotiator(ProtocolVersion.V1.value());
                     }
                 };
             }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/DistributedSetCacheTest.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/DistributedSetCacheTest.java
index a5b6342dd1..17612a301e 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/DistributedSetCacheTest.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/DistributedSetCacheTest.java
@@ -24,25 +24,18 @@ import org.apache.nifi.processor.Processor;
 import org.apache.nifi.remote.io.socket.NetworkUtils;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
-/**
- * Verify basic functionality of {@link DistributedSetCacheClientService}.
- * <p>
- * This test instantiates both the server and client {@link org.apache.nifi.controller.ControllerService} objects
- * implementing the distributed cache protocol.  It assumes that the default distributed cache port (4557)
- * is available.
- */
 public class DistributedSetCacheTest {
 
     private static TestRunner runner = null;
@@ -50,8 +43,8 @@ public class DistributedSetCacheTest {
     private static DistributedSetCacheClientService client = null;
     private static final Serializer<String> serializer = new StringSerializer();
 
-    @BeforeClass
-    public static void beforeClass() throws Exception {
+    @BeforeAll
+    public static void setRunner() throws Exception {
         final String port = Integer.toString(NetworkUtils.getAvailableTcpPort());
         runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
 
@@ -67,8 +60,8 @@ public class DistributedSetCacheTest {
         runner.enableControllerService(client);
     }
 
-    @AfterClass
-    public static void afterClass() {
+    @AfterAll
+    public static void shutdown() {
         runner.disableControllerService(client);
         runner.removeControllerService(client);