You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by gr...@apache.org on 2022/05/18 16:42:14 UTC
[nifi] branch main updated: NIFI-9805 Refactored Distributed Cache Servers using Netty
This is an automated email from the ASF dual-hosted git repository.
greyp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new fe424a2d42 NIFI-9805 Refactored Distributed Cache Servers using Netty
fe424a2d42 is described below
commit fe424a2d420945847172fbcb3aedd070e5b7b953
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Thu May 12 00:05:11 2022 -0500
NIFI-9805 Refactored Distributed Cache Servers using Netty
- Added Map and Set Cache Servers based on nifi-event-transport components
- Removed custom servers and unused socket stream components
- Reduced duplication on protocol classes
- Added checks for readable bytes
- Added mark and reset handling for buffer reads
This closes #6040
Signed-off-by: Paul Grey <gr...@apache.org>
---
.../remote/io/socket/SocketChannelInputStream.java | 188 ---------------
.../io/socket/SocketChannelOutputStream.java | 124 ----------
.../remote/io/socket/TestSocketChannelStreams.java | 231 -------------------
.../{SetOperation.java => CacheOperation.java} | 24 +-
.../distributed/cache/operations/MapOperation.java | 3 +-
.../distributed/cache/operations/SetOperation.java | 3 +-
...tOperation.java => StandardCacheOperation.java} | 10 +-
.../nifi-distributed-cache-server/pom.xml | 5 +
.../cache/server/AbstractCacheServer.java | 247 --------------------
.../cache/server/DistributedSetCacheServer.java | 10 +-
.../distributed/cache/server/EventCacheServer.java | 113 +++++++++
.../distributed/cache/server/SetCacheServer.java | 102 ---------
.../server/codec/CacheOperationResultEncoder.java} | 30 ++-
.../cache/server/codec/CacheRequestDecoder.java | 243 ++++++++++++++++++++
.../server/codec/CacheVersionRequestHandler.java | 81 +++++++
.../server/codec/CacheVersionResponseEncoder.java | 41 ++++
.../cache/server/codec/MapCacheRequestDecoder.java | 150 ++++++++++++
.../cache/server/codec/MapCacheRequestHandler.java | 178 +++++++++++++++
.../server/codec/MapRemoveResponseEncoder.java} | 29 ++-
.../server/codec/MapSizeResponseEncoder.java} | 29 ++-
.../server/codec/MapValueResponseEncoder.java | 43 ++++
.../cache/server/codec/SetCacheRequestHandler.java | 80 +++++++
.../server/map/DistributedMapCacheServer.java | 23 +-
.../cache/server/map/MapCacheServer.java | 252 ---------------------
.../cache/server/map/StandardMapCacheServer.java | 124 ++++++++++
.../server/protocol/CacheOperationResult.java} | 24 +-
.../cache/server/protocol/CacheRequest.java} | 33 ++-
.../server/protocol/CacheVersionRequest.java} | 23 +-
.../server/protocol/CacheVersionResponse.java} | 29 ++-
.../cache/server/protocol/MapCacheRequest.java | 113 +++++++++
.../cache/server/protocol/MapRemoveResponse.java} | 23 +-
.../cache/server/protocol/MapSizeResponse.java} | 23 +-
.../cache/server/protocol/MapValueResponse.java | 49 ++--
.../cache/server/set/StandardSetCacheServer.java | 107 +++++++++
.../cache/server/map/DistributedMapCacheTest.java | 33 ++-
.../server/map/DistributedMapCacheTlsTest.java | 28 +--
.../server/map/StandardMapCacheServerTest.java | 175 ++++++++++++++
.../map/TestDistributedMapServerAndClient.java | 10 +-
.../cache/server/set/DistributedSetCacheTest.java | 25 +-
39 files changed, 1673 insertions(+), 1385 deletions(-)
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
deleted file mode 100644
index 21f1683cfe..0000000000
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket;
-
-import org.apache.nifi.remote.exception.TransmissionDisabledException;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.util.Set;
-
-public class SocketChannelInputStream extends InputStream {
-
- private final SocketChannel channel;
- private volatile int timeoutMillis = 30000;
- private volatile boolean interrupted = false;
- private final Selector readSelector;
-
- private final ByteBuffer oneByteBuffer = ByteBuffer.allocate(1);
- private Byte bufferedByte = null;
-
- public SocketChannelInputStream(final SocketChannel socketChannel) throws IOException {
- // this class expects a non-blocking channel
- socketChannel.configureBlocking(false);
- this.channel = socketChannel;
-
- readSelector = Selector.open();
- this.channel.register(readSelector, SelectionKey.OP_READ);
- }
-
- public void setTimeout(final int timeoutMillis) {
- this.timeoutMillis = timeoutMillis;
- }
-
- public void consume() throws IOException {
- channel.shutdownInput();
-
- final byte[] b = new byte[4096];
- final ByteBuffer buffer = ByteBuffer.wrap(b);
- int bytesRead;
- do {
- bytesRead = channel.read(buffer);
- buffer.flip();
- } while (bytesRead > 0);
- }
-
- @Override
- public int read() throws IOException {
- if (bufferedByte != null) {
- final int retVal = bufferedByte & 0xFF;
- bufferedByte = null;
- return retVal;
- }
-
- oneByteBuffer.flip();
- oneByteBuffer.clear();
-
- final long maxTime = System.currentTimeMillis() + timeoutMillis;
-
- waitForReady();
-
- int bytesRead;
- do {
- bytesRead = channel.read(oneByteBuffer);
- if (bytesRead == 0) {
- if (System.currentTimeMillis() > maxTime) {
- throw new SocketTimeoutException("Timed out reading from socket");
- }
- }
- } while (bytesRead == 0);
-
- if (bytesRead == -1) {
- return -1;
- }
-
- oneByteBuffer.flip();
- return oneByteBuffer.get() & 0xFF;
- }
-
- @Override
- public int read(final byte[] b) throws IOException {
- return read(b, 0, b.length);
- }
-
- @Override
- public int read(final byte[] b, final int off, final int len) throws IOException {
- if (bufferedByte != null) {
- final byte retVal = bufferedByte;
- bufferedByte = null;
- b[off] = retVal;
- return 1;
- }
-
- waitForReady();
-
- final ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
- final long maxTime = System.currentTimeMillis() + timeoutMillis;
- int bytesRead;
- do {
- bytesRead = channel.read(buffer);
- if (bytesRead == 0) {
- if (System.currentTimeMillis() > maxTime) {
- throw new SocketTimeoutException("Timed out reading from socket");
- }
- }
- } while (bytesRead == 0);
-
- return bytesRead;
- }
-
- private void waitForReady() throws IOException {
- int readyCount = readSelector.select(timeoutMillis);
- if (readyCount < 1) {
- if (interrupted) {
- throw new TransmissionDisabledException();
- }
-
- throw new SocketTimeoutException("Timed out reading from socket");
- }
-
- final Set<SelectionKey> selectedKeys = readSelector.selectedKeys();
- selectedKeys.clear(); // clear the selected keys so that the Selector will be able to add them back to the ready set next time they are ready.
- }
-
- @Override
- public int available() throws IOException {
- if (bufferedByte != null) {
- return 1;
- }
-
- isDataAvailable(); // attempt to read from socket
- return (bufferedByte == null) ? 0 : 1;
- }
-
- public boolean isDataAvailable() throws IOException {
- if (bufferedByte != null) {
- return true;
- }
-
- oneByteBuffer.flip();
- oneByteBuffer.clear();
- final int bytesRead = channel.read(oneByteBuffer);
- if (bytesRead == -1) {
- throw new EOFException("Peer has closed the stream");
- }
- if (bytesRead > 0) {
- oneByteBuffer.flip();
- bufferedByte = oneByteBuffer.get();
- return true;
- }
- return false;
- }
-
- public void interrupt() {
- interrupted = true;
- readSelector.wakeup();
- }
-
- /**
- * Closes the underlying socket channel.
- *
- * @throws java.io.IOException for issues closing underlying stream
- */
- @Override
- public void close() throws IOException {
- channel.close();
- readSelector.close();
- }
-}
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java
deleted file mode 100644
index 6a387a9e45..0000000000
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.SocketChannel;
-import java.util.concurrent.TimeUnit;
-
-public class SocketChannelOutputStream extends OutputStream {
-
- private static final long CHANNEL_FULL_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(1, TimeUnit.MILLISECONDS);
- private final SocketChannel channel;
- private volatile int timeout = 30000;
-
- private final ByteBuffer oneByteBuffer = ByteBuffer.allocate(1);
-
- public SocketChannelOutputStream(final SocketChannel socketChannel) throws IOException {
- // this class expects a non-blocking channel
- socketChannel.configureBlocking(false);
- this.channel = socketChannel;
- }
-
- public void setTimeout(final int timeoutMillis) {
- this.timeout = timeoutMillis;
- }
-
- @Override
- public void write(final int b) throws IOException {
- oneByteBuffer.flip();
- oneByteBuffer.clear();
- oneByteBuffer.put((byte) b);
- oneByteBuffer.flip();
-
- final int timeoutMillis = this.timeout;
- long maxTime = System.currentTimeMillis() + timeoutMillis;
- int bytesWritten;
-
- long sleepNanos = 1L;
- while (oneByteBuffer.hasRemaining()) {
- bytesWritten = channel.write(oneByteBuffer);
- if (bytesWritten == 0) {
- if (System.currentTimeMillis() > maxTime) {
- throw new SocketTimeoutException("Timed out writing to socket");
- }
-
- try {
- TimeUnit.NANOSECONDS.sleep(sleepNanos);
- } catch (InterruptedException e) {
- close();
- Thread.currentThread().interrupt(); // set the interrupt status
- throw new ClosedByInterruptException(); // simulate an interrupted blocked write operation
- }
-
- sleepNanos = Math.min(sleepNanos * 2, CHANNEL_FULL_WAIT_NANOS);
- } else {
- return;
- }
- }
- }
-
- @Override
- public void write(final byte[] b) throws IOException {
- write(b, 0, b.length);
- }
-
- @Override
- public void write(final byte[] b, final int off, final int len) throws IOException {
- final ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
-
- final int timeoutMillis = this.timeout;
- long maxTime = System.currentTimeMillis() + timeoutMillis;
- int bytesWritten;
-
- long sleepNanos = 1L;
- while (buffer.hasRemaining()) {
- bytesWritten = channel.write(buffer);
- if (bytesWritten == 0) {
- if (System.currentTimeMillis() > maxTime) {
- throw new SocketTimeoutException("Timed out writing to socket");
- }
-
- try {
- TimeUnit.NANOSECONDS.sleep(sleepNanos);
- } catch (InterruptedException e) {
- close();
- Thread.currentThread().interrupt(); // set the interrupt status
- throw new ClosedByInterruptException(); // simulate an interrupted blocked write operation
- }
-
- sleepNanos = Math.min(sleepNanos * 2, CHANNEL_FULL_WAIT_NANOS);
- } else {
- maxTime = System.currentTimeMillis() + timeoutMillis;
- }
- }
- }
-
- /**
- * Closes the underlying SocketChannel
- *
- * @throws java.io.IOException if issues closing underlying stream
- */
- @Override
- public void close() throws IOException {
- channel.close();
- }
-}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/io/socket/TestSocketChannelStreams.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/io/socket/TestSocketChannelStreams.java
deleted file mode 100644
index 27a8807a3b..0000000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/io/socket/TestSocketChannelStreams.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket;
-
-//package nifi.remote.io.socket;
-//
-//import static org.junit.Assert.assertEquals;
-//import static org.junit.Assert.assertTrue;
-//
-//import java.io.ByteArrayOutputStream;
-//import java.io.DataInputStream;
-//import java.io.DataOutputStream;
-//import java.io.IOException;
-//import java.io.InputStream;
-//import java.io.OutputStream;
-//import java.net.InetSocketAddress;
-//import java.net.ServerSocket;
-//import java.net.Socket;
-//import java.net.SocketTimeoutException;
-//import java.net.URI;
-//import java.net.URISyntaxException;
-//import java.nio.channels.SocketChannel;
-//import java.util.Arrays;
-//import java.util.concurrent.TimeUnit;
-//
-//import javax.net.ServerSocketFactory;
-//
-//import nifi.events.EventReporter;
-//import nifi.groups.RemoteProcessGroup;
-//import nifi.remote.RemoteGroupPort;
-//import nifi.remote.RemoteResourceFactory;
-//import nifi.remote.StandardSiteToSiteProtocol;
-//import nifi.remote.TransferDirection;
-//import nifi.remote.exception.HandshakeException;
-//import nifi.remote.exception.PortNotRunningException;
-//import nifi.remote.exception.UnknownPortException;
-//import nifi.remote.protocol.CommunicationsProtocol;
-//import nifi.remote.protocol.CommunicationsSession;
-//import nifi.util.NiFiProperties;
-//
-//import org.junit.Ignore;
-//import org.junit.Test;
-//import org.mockito.Mockito;
-//
-//@Ignore("For local testing only")
-//public class TestSocketChannelStreams {
-// public static final int DATA_SIZE = 8 * 1024 * 1024;
-//
-// @Test
-// public void testSendingToLocalInstanceWithoutSSL() throws IOException, InterruptedException, HandshakeException, UnknownPortException, PortNotRunningException, URISyntaxException {
-// System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties");
-// final SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", 5000));
-// channel.configureBlocking(false);
-//
-// final CommunicationsSession commsSession;
-// commsSession = new SocketChannelCommunicationsSession(channel, "", null);
-// commsSession.setUri("nifi://localhost:5000");
-// final DataInputStream dis = new DataInputStream(commsSession.getRequest().getInputStream());
-// final DataOutputStream dos = new DataOutputStream(commsSession.getResponse().getOutputStream());
-//
-// dos.write(CommunicationsProtocol.MAGIC_BYTES);
-// dos.flush();
-//
-// final EventReporter eventReporter = Mockito.mock(EventReporter.class);
-// final StandardSiteToSiteProtocol proposedProtocol = new StandardSiteToSiteProtocol(commsSession, eventReporter, nifiProperties);
-//
-// final StandardSiteToSiteProtocol negotiatedProtocol = (StandardSiteToSiteProtocol) RemoteResourceFactory.initiateResourceNegotiation(proposedProtocol, dis, dos);
-// System.out.println(negotiatedProtocol);
-//
-// final RemoteProcessGroup rpg = Mockito.mock(RemoteProcessGroup.class);
-// Mockito.when(rpg.getCommunicationsTimeout(Mockito.any(TimeUnit.class))).thenReturn(2000);
-// Mockito.when(rpg.getTargetUri()).thenReturn( new URI("https://localhost:5050/") );
-//
-// final RemoteGroupPort port = Mockito.mock(RemoteGroupPort.class);
-// Mockito.when(port.getIdentifier()).thenReturn("90880680-d6da-40be-b2cc-a15423de2e1a");
-// Mockito.when(port.getName()).thenReturn("Data In");
-// Mockito.when(port.getRemoteProcessGroup()).thenReturn(rpg);
-//
-// negotiatedProtocol.initiateHandshake(port, TransferDirection.SEND);
-// }
-//
-// @Test
-// public void testInputOutputStreams() throws IOException, InterruptedException {
-// final ServerThread server = new ServerThread();
-// server.start();
-//
-// int port = server.getPort();
-// while ( port <= 0 ) {
-// Thread.sleep(10L);
-// port = server.getPort();
-// }
-//
-// final SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", port));
-// channel.configureBlocking(false);
-//
-// final OutputStream out = new SocketChannelOutputStream(channel);
-// final InputStream in = new SocketChannelInputStream(channel);
-// final DataInputStream dataIn = new DataInputStream(in);
-//
-// final byte[] sent = new byte[DATA_SIZE];
-// for (int i=0; i < sent.length; i++) {
-// sent[i] = (byte) (i % 255);
-// }
-//
-// for (int itr=0; itr < 5; itr++) {
-// final long start = System.nanoTime();
-// out.write(sent);
-// final long nanos = System.nanoTime() - start;
-// final long millis = TimeUnit.MILLISECONDS.convert(nanos, TimeUnit.NANOSECONDS);
-// final float seconds = (float) millis / 1000F;
-// final float megabytes = (float) DATA_SIZE / (1024F * 1024F);
-// final float MBperS = megabytes / seconds;
-// System.out.println("Millis: " + millis + "; MB/s: " + MBperS);
-//
-// Thread.sleep(2500L);
-// final byte[] received = server.getReceivedData();
-// System.out.println("Server received " + received.length + " bytes");
-// server.clearReceivedData();
-// assertTrue(Arrays.equals(sent, received));
-//
-// final long val = dataIn.readLong();
-// assertEquals(DATA_SIZE, val);
-// System.out.println(val);
-// }
-//
-// server.shutdown();
-// }
-//
-// public final long toLong(final byte[] buffer) throws IOException {
-// return (((long)buffer[0] << 56) +
-// ((long)(buffer[1] & 255) << 48) +
-// ((long)(buffer[2] & 255) << 40) +
-// ((long)(buffer[3] & 255) << 32) +
-// ((long)(buffer[4] & 255) << 24) +
-// ((buffer[5] & 255) << 16) +
-// ((buffer[6] & 255) << 8) +
-// ((buffer[7] & 255) << 0));
-// }
-//
-// private static class ServerThread extends Thread {
-// private int listeningPort;
-// private final ByteArrayOutputStream received = new ByteArrayOutputStream();
-//
-// private volatile int readingDelay = 0;
-// private volatile boolean shutdown = false;
-//
-// public ServerThread() {
-// }
-//
-// public int getPort() {
-// return listeningPort;
-// }
-//
-// public byte[] getReceivedData() {
-// return received.toByteArray();
-// }
-//
-// @Override
-// public void run() {
-// try {
-// final ServerSocketFactory serverSocketFactory = ServerSocketFactory.getDefault();
-// final ServerSocket serverSocket = serverSocketFactory.createServerSocket(0);
-// this.listeningPort = serverSocket.getLocalPort();
-//
-// final Socket socket = serverSocket.accept();
-// final InputStream stream = socket.getInputStream();
-// final DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
-//
-// final byte[] buffer = new byte[4096];
-// int len;
-//
-// while (!shutdown) {
-// try {
-// len = stream.read(buffer);
-//
-// System.out.println("Received " + len + " bytes");
-//
-// if ( readingDelay > 0 ) {
-// try { Thread.sleep(readingDelay); } catch (final InterruptedException e) {}
-// }
-// } catch (final SocketTimeoutException e) {
-// continue;
-// }
-//
-// if ( len < 0 ) {
-// return;
-// }
-//
-// received.write(buffer, 0, len);
-//
-// final long length = received.size();
-// if ( length % (DATA_SIZE) == 0 ) {
-// dos.writeLong(length);
-// dos.flush();
-// }
-// }
-//
-// System.out.println("Server successfully shutdown");
-// } catch (final Exception e) {
-// e.printStackTrace();
-// }
-// }
-//
-// public void clearReceivedData() {
-// this.received.reset();
-// }
-//
-// public void shutdown() {
-// this.shutdown = true;
-// }
-//
-// public void delayReading(final int millis) {
-// this.readingDelay = millis;
-// }
-// }
-//
-//}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/CacheOperation.java
similarity index 69%
copy from nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
copy to nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/CacheOperation.java
index ce6718f6f5..4454984f31 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/CacheOperation.java
@@ -17,21 +17,13 @@
package org.apache.nifi.distributed.cache.operations;
/**
- * Represents a distributed set cache operation which may be invoked.
+ * Cache Operation definition
*/
-public enum SetOperation {
- ADD_IF_ABSENT("addIfAbsent"),
- CONTAINS("contains"),
- REMOVE("remove"),
- CLOSE("close");
-
- private final String operation;
-
- SetOperation(final String operation) {
- this.operation = operation;
- }
-
- public String value() {
- return operation;
- }
+public interface CacheOperation {
+ /**
+ * Get Cache Operation value used during protocol communication
+ *
+ * @return Cache Operation value
+ */
+ String value();
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/MapOperation.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/MapOperation.java
index 12177c09a7..e44edee5ed 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/MapOperation.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/MapOperation.java
@@ -19,7 +19,7 @@ package org.apache.nifi.distributed.cache.operations;
/**
* Represents a distributed set cache operation which may be invoked.
*/
-public enum MapOperation {
+public enum MapOperation implements CacheOperation {
CONTAINS_KEY("containsKey"),
FETCH("fetch"),
GET("get"),
@@ -41,6 +41,7 @@ public enum MapOperation {
this.operation = operation;
}
+ @Override
public String value() {
return operation;
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
index ce6718f6f5..c07af160ff 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
@@ -19,7 +19,7 @@ package org.apache.nifi.distributed.cache.operations;
/**
* Represents a distributed set cache operation which may be invoked.
*/
-public enum SetOperation {
+public enum SetOperation implements CacheOperation {
ADD_IF_ABSENT("addIfAbsent"),
CONTAINS("contains"),
REMOVE("remove"),
@@ -31,6 +31,7 @@ public enum SetOperation {
this.operation = operation;
}
+ @Override
public String value() {
return operation;
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/StandardCacheOperation.java
similarity index 82%
copy from nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
copy to nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/StandardCacheOperation.java
index ce6718f6f5..23eef3271a 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/StandardCacheOperation.java
@@ -17,20 +17,18 @@
package org.apache.nifi.distributed.cache.operations;
/**
- * Represents a distributed set cache operation which may be invoked.
+ * Standard Cache Operation
*/
-public enum SetOperation {
- ADD_IF_ABSENT("addIfAbsent"),
- CONTAINS("contains"),
- REMOVE("remove"),
+public enum StandardCacheOperation implements CacheOperation {
CLOSE("close");
private final String operation;
- SetOperation(final String operation) {
+ StandardCacheOperation(final String operation) {
this.operation = operation;
}
+ @Override
public String value() {
return operation;
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/pom.xml
index 5e42ee395b..a7ad1d5bc1 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/pom.xml
@@ -30,6 +30,11 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-distributed-cache-protocol</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-event-transport</artifactId>
+ <version>1.17.0-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
deleted file mode 100644
index fb3d597ed3..0000000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-import javax.net.ssl.SSLContext;
-
-import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
-import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
-import org.apache.nifi.remote.StandardVersionNegotiator;
-import org.apache.nifi.remote.VersionNegotiator;
-import org.apache.nifi.remote.io.socket.SocketChannelInputStream;
-import org.apache.nifi.remote.io.socket.SocketChannelOutputStream;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class AbstractCacheServer implements CacheServer {
-
- private static final Logger logger = LoggerFactory.getLogger(AbstractCacheServer.class);
-
- private final String identifier;
- private final int port;
- private final int maxReadSize;
- private final SSLContext sslContext;
- protected volatile boolean stopped = false;
- private final Set<Thread> processInputThreads = new CopyOnWriteArraySet<>();
-
- private volatile ServerSocketChannel serverSocketChannel;
-
- public AbstractCacheServer(final String identifier, final SSLContext sslContext, final int port, final int maxReadSize) {
- this.identifier = identifier;
- this.port = port;
- this.sslContext = sslContext;
- this.maxReadSize = maxReadSize;
- }
-
- @Override
- public int getPort() {
- return serverSocketChannel == null ? this.port : serverSocketChannel.socket().getLocalPort();
- }
-
- @Override
- public void start() throws IOException {
- serverSocketChannel = ServerSocketChannel.open();
- serverSocketChannel.configureBlocking(true);
- serverSocketChannel.bind(new InetSocketAddress(port));
-
- final Runnable runnable = new Runnable() {
-
- @Override
- public void run() {
- while (true) {
- final SocketChannel socketChannel;
- try {
- socketChannel = serverSocketChannel.accept();
- logger.debug("Connected to {}", new Object[]{socketChannel});
- } catch (final IOException e) {
- if (!stopped) {
- logger.error("{} unable to accept connection from remote peer due to {}", this, e.toString());
- if (logger.isDebugEnabled()) {
- logger.error("", e);
- }
- }
- return;
- }
-
- final Runnable processInputRunnable = new Runnable() {
- @Override
- public void run() {
- final InputStream rawInputStream;
- final OutputStream rawOutputStream;
- final String peer = socketChannel.socket().getInetAddress().getHostName();
-
- try {
- if (sslContext == null) {
- rawInputStream = new SocketChannelInputStream(socketChannel);
- rawOutputStream = new SocketChannelOutputStream(socketChannel);
- } else {
- final SSLSocketChannel sslSocketChannel = new SSLSocketChannel(sslContext, socketChannel, false);
- sslSocketChannel.connect();
- rawInputStream = new SSLSocketChannelInputStream(sslSocketChannel);
- rawOutputStream = new SSLSocketChannelOutputStream(sslSocketChannel);
- }
- } catch (final IOException e) {
- logger.error("Cannot create input and/or output streams for {}", new Object[]{identifier}, e);
- if (logger.isDebugEnabled()) {
- logger.error("", e);
- }
- try {
- socketChannel.close();
- } catch (final IOException swallow) {
- }
-
- return;
- }
- try (final InputStream in = new BufferedInputStream(rawInputStream);
- final OutputStream out = new BufferedOutputStream(rawOutputStream)) {
-
- final VersionNegotiator versionNegotiator = getVersionNegotiator();
-
- ProtocolHandshake.receiveHandshake(in, out, versionNegotiator);
-
- boolean continueComms = true;
- while (continueComms) {
- continueComms = listen(in, out, versionNegotiator.getVersion());
- }
- // client has issued 'close'
- logger.debug("Client issued close on {}", new Object[]{socketChannel});
- } catch (final SocketTimeoutException e) {
- logger.debug("30 sec timeout reached", e);
- } catch (final IOException | HandshakeException e) {
- if (!stopped) {
- logger.error("{} unable to communicate with remote peer {} due to {}", new Object[]{this, peer, e.toString()});
- if (logger.isDebugEnabled()) {
- logger.error("", e);
- }
- }
- } finally {
- processInputThreads.remove(Thread.currentThread());
- }
- }
- };
-
- final Thread processInputThread = new Thread(processInputRunnable);
- processInputThread.setName("Distributed Cache Server Communications Thread: " + identifier);
- processInputThread.setDaemon(true);
- processInputThread.start();
- processInputThreads.add(processInputThread);
- }
- }
- };
-
- final Thread thread = new Thread(runnable);
- thread.setDaemon(true);
- thread.setName("Distributed Cache Server: " + identifier);
- thread.start();
- }
-
- /**
- * Refer {@link org.apache.nifi.distributed.cache.protocol.ProtocolHandshake#initiateHandshake(InputStream, OutputStream, VersionNegotiator)}
- * for details of each version enhancements.
- */
- protected StandardVersionNegotiator getVersionNegotiator() {
- return new StandardVersionNegotiator(1);
- }
-
- @Override
- public void stop() throws IOException {
- stopped = true;
- logger.info("Stopping CacheServer {}", new Object[]{this.identifier});
-
- if (serverSocketChannel != null && serverSocketChannel.isOpen()) {
- try {
- serverSocketChannel.close();
- } catch (final IOException e) {
- logger.warn("Server Socket Close Failed", e);
- }
- }
- // need to close out the created SocketChannels...this is done by interrupting
- // the created threads that loop on listen().
- for (final Thread processInputThread : processInputThreads) {
- processInputThread.interrupt();
- int i = 0;
- while (!processInputThread.isInterrupted() && i++ < 5) {
- try {
- Thread.sleep(50); // allow thread to gracefully terminate
- } catch (final InterruptedException e) {
- }
- }
- }
- processInputThreads.clear();
- }
-
- @Override
- public String toString() {
- return "CacheServer[id=" + identifier + "]";
- }
-
- /**
- * Listens for incoming data and communicates with remote peer
- *
- * @param in in
- * @param out out
- * @param version version
- * @return <code>true</code> if communications should continue, <code>false</code> otherwise
- * @throws IOException ex
- */
- protected abstract boolean listen(InputStream in, OutputStream out, int version) throws IOException;
-
- /**
- * Read a length-prefixed value from the {@link DataInputStream}.
- *
- * @param dis the {@link DataInputStream} from which to read the value
- * @return the serialized representation of the value
- * @throws IOException on failure to read from the input stream
- */
- protected byte[] readValue(final DataInputStream dis) throws IOException {
- final int numBytes = validateSize(dis.readInt());
- final byte[] buffer = new byte[numBytes];
- dis.readFully(buffer);
- return buffer;
- }
-
- /**
- * Validate a size value received from the {@link DataInputStream} against the configured maximum.
- *
- * @param size the size value received from the {@link DataInputStream}
- * @return the size value, iff it passes validation; otherwise, an exception is thrown
- */
- protected int validateSize(final int size) {
- if (size <= maxReadSize) {
- return size;
- } else {
- throw new IllegalStateException(String.format("Size [%d] exceeds maximum configured read [%d]", size, maxReadSize));
- }
- }
-}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java
index cb81725f28..201ef93455 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java
@@ -21,6 +21,7 @@ import javax.net.ssl.SSLContext;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.distributed.cache.server.set.StandardSetCacheServer;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.ssl.SSLContextService;
@@ -38,12 +39,7 @@ public class DistributedSetCacheServer extends DistributedCacheServer {
final String evictionPolicyName = context.getProperty(EVICTION_POLICY).getValue();
final int maxReadSize = context.getProperty(MAX_READ_SIZE).asDataSize(DataUnit.B).intValue();
- final SSLContext sslContext;
- if (sslContextService == null) {
- sslContext = null;
- } else {
- sslContext = sslContextService.createContext();
- }
+ final SSLContext sslContext = sslContextService == null ? null : sslContextService.createContext();
final EvictionPolicy evictionPolicy;
switch (evictionPolicyName) {
@@ -63,7 +59,7 @@ public class DistributedSetCacheServer extends DistributedCacheServer {
try {
final File persistenceDir = persistencePath == null ? null : new File(persistencePath);
- return new SetCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir, maxReadSize);
+ return new StandardSetCacheServer(getLogger(), getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir, maxReadSize);
} catch (final Exception e) {
throw new RuntimeException(e);
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EventCacheServer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EventCacheServer.java
new file mode 100644
index 0000000000..059bbb59ef
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EventCacheServer.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server;
+
+import org.apache.nifi.event.transport.EventServer;
+import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
+import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.security.util.ClientAuth;
+
+import javax.net.ssl.SSLContext;
+import java.net.InetAddress;
+import java.util.Objects;
+
+/**
+ * Abstract Event Cache Server with standard lifecycle methods
+ */
+public abstract class EventCacheServer implements CacheServer {
+ private static final InetAddress ALL_ADDRESSES = null;
+
+ private final ComponentLog log;
+
+ private final int port;
+
+ private EventServer eventServer;
+
+ public EventCacheServer(
+ final ComponentLog log,
+ final int port
+ ) {
+ this.log = Objects.requireNonNull(log, "Component Log required");
+ this.port = port;
+ }
+
+ /**
+ * Start Server
+ *
+ */
+ @Override
+ public void start() {
+ eventServer = createEventServer();
+ log.info("Started Cache Server Port [{}]", port);
+ }
+
+ /**
+ * Stop Server
+ *
+ */
+ @Override
+ public void stop() {
+ if (eventServer == null) {
+ log.info("Server not running");
+ } else {
+ eventServer.shutdown();
+ }
+
+ log.info("Stopped Cache Server Port [{}]", port);
+ }
+
+ /**
+ * Get Server Port Number
+ *
+ * @return Port Number
+ */
+ @Override
+ public int getPort() {
+ return port;
+ }
+
+ /**
+ * Create Event Server Factory with standard properties
+ *
+ * @param identifier Component Identifier
+ * @param sslContext SSL Context is null when not configured
+ * @return Netty Event Server Factory
+ */
+ protected NettyEventServerFactory createEventServerFactory(final String identifier, final SSLContext sslContext) {
+ final NettyEventServerFactory eventServerFactory = new NettyEventServerFactory(ALL_ADDRESSES, port, TransportProtocol.TCP);
+ eventServerFactory.setSslContext(sslContext);
+ eventServerFactory.setClientAuth(ClientAuth.REQUIRED);
+
+ final String threadNamePrefix = String.format("%s[%s]", getClass().getSimpleName(), identifier);
+ eventServerFactory.setThreadNamePrefix(threadNamePrefix);
+
+ eventServerFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
+ eventServerFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
+
+ return eventServerFactory;
+ }
+
+ /**
+ * Create Event Server
+ *
+ * @return Event Server
+ */
+ protected abstract EventServer createEventServer();
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
deleted file mode 100644
index 5e1fb03eec..0000000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-import javax.net.ssl.SSLContext;
-
-import org.apache.nifi.distributed.cache.server.set.PersistentSetCache;
-import org.apache.nifi.distributed.cache.server.set.SetCache;
-import org.apache.nifi.distributed.cache.server.set.SetCacheResult;
-import org.apache.nifi.distributed.cache.server.set.SimpleSetCache;
-
-public class SetCacheServer extends AbstractCacheServer {
-
- private final SetCache cache;
-
- public SetCacheServer(final String identifier, final SSLContext sslContext, final int port, final int maxSize,
- final EvictionPolicy evictionPolicy, final File persistencePath, final int maxReadSize) throws IOException {
- super(identifier, sslContext, port, maxReadSize);
-
- final SetCache simpleCache = new SimpleSetCache(identifier, maxSize, evictionPolicy);
-
- if (persistencePath == null) {
- this.cache = simpleCache;
- } else {
- final PersistentSetCache persistentCache = new PersistentSetCache(identifier, persistencePath, simpleCache);
- persistentCache.restore();
- this.cache = persistentCache;
- }
- }
-
- @Override
- protected boolean listen(final InputStream in, final OutputStream out, final int version) throws IOException {
- final DataInputStream dis = new DataInputStream(in);
- final DataOutputStream dos = new DataOutputStream(out);
-
- final String action = dis.readUTF();
- if (action.equals("close")) {
- return false;
- }
-
- final byte[] value = readValue(dis);
- final ByteBuffer valueBuffer = ByteBuffer.wrap(value);
-
- final SetCacheResult response;
- switch (action) {
- case "addIfAbsent":
- response = cache.addIfAbsent(valueBuffer);
- break;
- case "contains":
- response = cache.contains(valueBuffer);
- break;
- case "remove":
- response = cache.remove(valueBuffer);
- break;
- default:
- throw new IOException("IllegalRequest");
- }
-
- dos.writeBoolean(response.getResult());
- dos.flush();
-
- return true;
- }
-
- @Override
- public void stop() throws IOException {
- try {
- super.stop();
- } finally {
- cache.shutdown();
- }
- }
-
- @Override
- protected void finalize() throws Throwable {
- if (!stopped) {
- stop();
- }
- }
-}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/CacheOperationResultEncoder.java
similarity index 52%
copy from nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
copy to nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/CacheOperationResultEncoder.java
index ce6718f6f5..78b86235c4 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/CacheOperationResultEncoder.java
@@ -14,24 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.distributed.cache.operations;
+package org.apache.nifi.distributed.cache.server.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.nifi.distributed.cache.server.protocol.CacheOperationResult;
/**
- * Represents a distributed set cache operation which may be invoked.
+ * Message Encoder for Cache Operation Results
*/
-public enum SetOperation {
- ADD_IF_ABSENT("addIfAbsent"),
- CONTAINS("contains"),
- REMOVE("remove"),
- CLOSE("close");
-
- private final String operation;
-
- SetOperation(final String operation) {
- this.operation = operation;
- }
-
- public String value() {
- return operation;
+@ChannelHandler.Sharable
+public class CacheOperationResultEncoder extends MessageToByteEncoder<CacheOperationResult> {
+ @Override
+ protected void encode(final ChannelHandlerContext channelHandlerContext, final CacheOperationResult cacheOperationResult, final ByteBuf byteBuf) {
+ final int code = cacheOperationResult.isSuccess() ? 1 : 0;
+ byteBuf.writeByte(code);
}
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/CacheRequestDecoder.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/CacheRequestDecoder.java
new file mode 100644
index 0000000000..51b4bfa608
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/CacheRequestDecoder.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.distributed.cache.operations.CacheOperation;
+import org.apache.nifi.distributed.cache.operations.StandardCacheOperation;
+import org.apache.nifi.distributed.cache.server.protocol.CacheRequest;
+import org.apache.nifi.distributed.cache.server.protocol.CacheVersionRequest;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.net.SocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Cache Request Decoder processes bytes and decodes cache version and operation requests
+ */
+public class CacheRequestDecoder extends ByteToMessageDecoder {
+ private static final int HEADER_LENGTH = 4;
+
+ private static final int LONG_LENGTH = 8;
+
+ private static final int INT_LENGTH = 4;
+
+ private static final int SHORT_LENGTH = 2;
+
+ private final AtomicBoolean headerReceived = new AtomicBoolean();
+
+ private final AtomicInteger protocolVersion = new AtomicInteger();
+
+ private final ComponentLog log;
+
+ private final int maxLength;
+
+ private final CacheOperation[] supportedOperations;
+
+ public CacheRequestDecoder(
+ final ComponentLog log,
+ final int maxLength,
+ final CacheOperation[] supportedOperations
+ ) {
+ this.log = log;
+ this.maxLength = maxLength;
+ this.supportedOperations = supportedOperations;
+ }
+
+ /**
+ * Decode Byte Buffer reading header on initial connection followed by protocol version and cache operations
+ *
+ * @param channelHandlerContext Channel Handler Context
+ * @param byteBuf Byte Buffer
+ * @param objects Decoded Objects
+ */
+ @Override
+ protected void decode(final ChannelHandlerContext channelHandlerContext, final ByteBuf byteBuf, final List<Object> objects) {
+ if (!headerReceived.get()) {
+ readHeader(byteBuf, channelHandlerContext.channel().remoteAddress());
+ }
+
+ if (protocolVersion.get() == 0) {
+ final OptionalInt clientVersion = readInt(byteBuf);
+ if (clientVersion.isPresent()) {
+ final int clientVersionFound = clientVersion.getAsInt();
+ log.debug("Protocol Version [{}] Received [{}]", clientVersionFound, channelHandlerContext.channel().remoteAddress());
+ final CacheVersionRequest cacheVersionRequest = new CacheVersionRequest(clientVersionFound);
+ objects.add(cacheVersionRequest);
+ }
+ } else {
+ // Mark ByteBuf reader index to reset when sufficient bytes are not found
+ byteBuf.markReaderIndex();
+
+ final Optional<CacheOperation> cacheOperation = readOperation(byteBuf);
+ if (cacheOperation.isPresent()) {
+ final CacheOperation cacheOperationFound = cacheOperation.get();
+
+ final Optional<Object> cacheRequest = readRequest(cacheOperationFound, byteBuf);
+ if (cacheRequest.isPresent()) {
+ final Object cacheRequestFound = cacheRequest.get();
+ objects.add(cacheRequestFound);
+ } else if (StandardCacheOperation.CLOSE.value().contentEquals(cacheOperationFound.value())) {
+ objects.add(new CacheRequest(cacheOperationFound, null));
+ } else {
+ byteBuf.resetReaderIndex();
+ log.debug("Cache Operation [{}] request not processed", cacheOperationFound);
+ }
+ } else {
+ byteBuf.resetReaderIndex();
+ }
+ }
+ }
+
+ @Override
+ public void exceptionCaught(final ChannelHandlerContext context, final Throwable cause) {
+ log.warn("Request Decoding Failed: Closing Connection [{}]", context.channel().remoteAddress(), cause);
+ context.close();
+ }
+
+ /**
+ * Set Protocol Version based on version negotiated in other handlers
+ *
+ * @param protocolVersion Protocol Version
+ */
+ public void setProtocolVersion(final int protocolVersion) {
+ this.protocolVersion.getAndSet(protocolVersion);
+ }
+
+ /**
+ * Read Request Object based on Cache Operation
+ *
+ * @param cacheOperation Cache Operation
+ * @param byteBuf Byte Buffer
+ * @return Request Object or empty when buffer does not contain sufficient bytes
+ */
+ protected Optional<Object> readRequest(final CacheOperation cacheOperation, final ByteBuf byteBuf) {
+ final Optional<byte[]> bytes = readBytes(byteBuf);
+ return bytes.map(value -> new CacheRequest(cacheOperation, value));
+ }
+
+ /**
+ * Read Bytes from buffer based on length indicated
+ *
+ * @param byteBuf Byte Buffer
+ * @return Bytes read or null when buffer does not contain sufficient bytes
+ */
+ protected Optional<byte[]> readBytes(final ByteBuf byteBuf) {
+ final Optional<byte[]> bytesRead;
+
+ final OptionalInt length = readInt(byteBuf);
+ if (length.isPresent()) {
+ final int readableBytes = byteBuf.readableBytes();
+ final int lengthFound = length.getAsInt();
+ if (readableBytes >= lengthFound) {
+ bytesRead = Optional.of(readBytes(byteBuf, lengthFound));
+ } else {
+ bytesRead = Optional.empty();
+ }
+ } else {
+ bytesRead = Optional.empty();
+ }
+
+ return bytesRead;
+ }
+
+ /**
+ * Read Unicode String from buffer based on length of available bytes
+ *
+ * @param byteBuf Byte Buffer
+ * @return String or null when buffer does not contain sufficient bytes
+ */
+ protected Optional<String> readUnicodeString(final ByteBuf byteBuf) {
+ final String unicodeString;
+
+ if (byteBuf.readableBytes() >= SHORT_LENGTH) {
+ final int length = byteBuf.readUnsignedShort();
+ if (length > maxLength) {
+ throw new IllegalArgumentException(String.format("Maximum Operation Length [%d] exceeded [%d]", maxLength, length));
+ }
+ if (byteBuf.readableBytes() >= length) {
+ unicodeString = byteBuf.readCharSequence(length, StandardCharsets.UTF_8).toString();
+ } else {
+ unicodeString = null;
+ }
+ } else {
+ unicodeString = null;
+ }
+
+ return Optional.ofNullable(unicodeString);
+ }
+
+ /**
+ * Read Integer from buffer
+ *
+ * @param byteBuf Byte Buffer
+ * @return Integer or empty when buffer does not contain sufficient bytes
+ */
+ protected OptionalInt readInt(final ByteBuf byteBuf) {
+ final Integer integer;
+
+ final int readableBytes = byteBuf.readableBytes();
+ if (readableBytes >= INT_LENGTH) {
+ integer = byteBuf.readInt();
+ if (integer > maxLength) {
+ throw new IllegalArgumentException(String.format("Maximum Length [%d] exceeded [%d]", maxLength, integer));
+ }
+ } else {
+ integer = null;
+ }
+
+ return integer == null ? OptionalInt.empty() : OptionalInt.of(integer);
+ }
+
+ protected OptionalLong readLong(final ByteBuf byteBuf) {
+ final int readableBytes = byteBuf.readableBytes();
+ return readableBytes >= LONG_LENGTH ? OptionalLong.of(byteBuf.readLong()) : OptionalLong.empty();
+ }
+
+ private byte[] readBytes(final ByteBuf byteBuf, final int length) {
+ final byte[] bytes = new byte[length];
+ byteBuf.readBytes(bytes);
+ return bytes;
+ }
+
+ private Optional<CacheOperation> readOperation(final ByteBuf byteBuf) {
+ final Optional<String> clientOperation = readUnicodeString(byteBuf);
+
+ return clientOperation.map(operation -> Arrays.stream(supportedOperations)
+ .filter(supportedOperation -> supportedOperation.value().contentEquals(operation))
+ .findFirst()
+ .orElseThrow(() -> new IllegalArgumentException(String.format("Cache Operation not supported [%d]", operation.length())))
+ );
+ }
+
+ private void readHeader(final ByteBuf byteBuf, final SocketAddress remoteAddress) {
+ if (byteBuf.readableBytes() >= HEADER_LENGTH) {
+ byteBuf.readBytes(HEADER_LENGTH);
+ headerReceived.getAndSet(true);
+ log.debug("Header Received [{}]", remoteAddress);
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/CacheVersionRequestHandler.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/CacheVersionRequestHandler.java
new file mode 100644
index 0000000000..4fc228f048
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/CacheVersionRequestHandler.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.codec;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
+import org.apache.nifi.distributed.cache.server.protocol.CacheVersionRequest;
+import org.apache.nifi.distributed.cache.server.protocol.CacheVersionResponse;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.remote.VersionNegotiator;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Handler for Cache Version Requests responsible for negotiating supported version
+ */
+public class CacheVersionRequestHandler extends SimpleChannelInboundHandler<CacheVersionRequest> {
+ private final ComponentLog log;
+
+ private final VersionNegotiator versionNegotiator;
+
+ public CacheVersionRequestHandler(
+ final ComponentLog log,
+ final VersionNegotiator versionNegotiator
+ ) {
+ this.log = Objects.requireNonNull(log, "Component Log required");
+ this.versionNegotiator = Objects.requireNonNull(versionNegotiator, "Version Negotiator required");
+ }
+
+ @Override
+ protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final CacheVersionRequest cacheVersionRequest) {
+ for (final Map.Entry<String, ChannelHandler> entry : channelHandlerContext.channel().pipeline()) {
+ final ChannelHandler channelHandler = entry.getValue();
+ if (channelHandler instanceof CacheRequestDecoder) {
+ final CacheRequestDecoder cacheRequestDecoder = (CacheRequestDecoder) channelHandler;
+
+ final int requestedVersion = cacheVersionRequest.getVersion();
+ final CacheVersionResponse cacheVersionResponse = handleVersion(cacheRequestDecoder, requestedVersion);
+ channelHandlerContext.writeAndFlush(cacheVersionResponse);
+ }
+ }
+ }
+
+ private CacheVersionResponse handleVersion(final CacheRequestDecoder cacheRequestDecoder, final int requestedVersion) {
+ final CacheVersionResponse cacheVersionResponse;
+
+ if (versionNegotiator.isVersionSupported(requestedVersion)) {
+ log.debug("Cache Version Supported [{}]", requestedVersion);
+ cacheRequestDecoder.setProtocolVersion(requestedVersion);
+ cacheVersionResponse = new CacheVersionResponse(ProtocolHandshake.RESOURCE_OK, requestedVersion);
+ } else {
+ final Integer preferredVersion = versionNegotiator.getPreferredVersion(requestedVersion);
+ if (preferredVersion == null) {
+ log.debug("Cache Version Rejected [{}]", requestedVersion);
+ cacheVersionResponse = new CacheVersionResponse(ProtocolHandshake.ABORT, requestedVersion);
+ } else {
+ log.debug("Cache Version Rejected [{}] Preferred [{}]", requestedVersion, preferredVersion);
+ cacheVersionResponse = new CacheVersionResponse(ProtocolHandshake.DIFFERENT_RESOURCE_VERSION, preferredVersion);
+ }
+ }
+
+ return cacheVersionResponse;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/CacheVersionResponseEncoder.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/CacheVersionResponseEncoder.java
new file mode 100644
index 0000000000..76b0cda7a2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/CacheVersionResponseEncoder.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
+import org.apache.nifi.distributed.cache.server.protocol.CacheVersionResponse;
+
+/**
+ * Message Encoder for Cache Version Responses
+ */
+@ChannelHandler.Sharable
+public class CacheVersionResponseEncoder extends MessageToByteEncoder<CacheVersionResponse> {
+ @Override
+ protected void encode(final ChannelHandlerContext channelHandlerContext, final CacheVersionResponse cacheVersionResponse, final ByteBuf byteBuf) {
+ final int statusCode = cacheVersionResponse.getStatusCode();
+ byteBuf.writeByte(statusCode);
+
+ if (ProtocolHandshake.DIFFERENT_RESOURCE_VERSION == statusCode) {
+ final int version = cacheVersionResponse.getVersion();
+ byteBuf.writeInt(version);
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapCacheRequestDecoder.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapCacheRequestDecoder.java
new file mode 100644
index 0000000000..f482d6912b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapCacheRequestDecoder.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.codec;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.nifi.distributed.cache.operations.CacheOperation;
+import org.apache.nifi.distributed.cache.operations.MapOperation;
+import org.apache.nifi.distributed.cache.server.protocol.MapCacheRequest;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+/**
+ * Cache Request Decoder processes bytes and decodes cache version and operation requests
+ */
+public class MapCacheRequestDecoder extends CacheRequestDecoder {
+
+ public MapCacheRequestDecoder(
+ final ComponentLog log,
+ final int maxLength,
+ final CacheOperation[] supportedOperations
+ ) {
+ super(log, maxLength, supportedOperations);
+ }
+
+ @Override
+ protected Optional<Object> readRequest(final CacheOperation cacheOperation, final ByteBuf byteBuf) {
+ final MapCacheRequest request;
+
+ if (MapOperation.CONTAINS_KEY == cacheOperation) {
+ request = readKeyRequest(cacheOperation, byteBuf);
+ } else if (MapOperation.FETCH == cacheOperation) {
+ request = readKeyRequest(cacheOperation, byteBuf);
+ } else if (MapOperation.GET == cacheOperation) {
+ request = readKeyRequest(cacheOperation, byteBuf);
+ } else if (MapOperation.GET_AND_PUT_IF_ABSENT == cacheOperation) {
+ request = readKeyValueRequest(cacheOperation, byteBuf);
+ } else if (MapOperation.KEYSET == cacheOperation) {
+ request = new MapCacheRequest(cacheOperation);
+ } else if (MapOperation.REMOVE == cacheOperation) {
+ request = readKeyRequest(cacheOperation, byteBuf);
+ } else if (MapOperation.REMOVE_BY_PATTERN == cacheOperation) {
+ request = readPatternRequest(cacheOperation, byteBuf);
+ } else if (MapOperation.REMOVE_BY_PATTERN_AND_GET == cacheOperation) {
+ request = readPatternRequest(cacheOperation, byteBuf);
+ } else if (MapOperation.REMOVE_AND_GET == cacheOperation) {
+ request = readKeyRequest(cacheOperation, byteBuf);
+ } else if (MapOperation.REPLACE == cacheOperation) {
+ request = readKeyRevisionValueRequest(cacheOperation, byteBuf);
+ } else if (MapOperation.SUBMAP == cacheOperation) {
+ request = readSubMapRequest(cacheOperation, byteBuf);
+ } else if (MapOperation.PUT == cacheOperation) {
+ request = readKeyValueRequest(cacheOperation, byteBuf);
+ } else if (MapOperation.PUT_IF_ABSENT == cacheOperation) {
+ request = readKeyValueRequest(cacheOperation, byteBuf);
+ } else {
+ request = new MapCacheRequest(cacheOperation);
+ }
+
+ return Optional.ofNullable(request);
+ }
+
+ private MapCacheRequest readKeyRequest(final CacheOperation cacheOperation, final ByteBuf byteBuf) {
+ final Optional<byte[]> key = readBytes(byteBuf);
+ return key.map(bytes -> new MapCacheRequest(cacheOperation, bytes)).orElse(null);
+ }
+
+ private MapCacheRequest readKeyValueRequest(final CacheOperation cacheOperation, final ByteBuf byteBuf) {
+ final MapCacheRequest mapCacheRequest;
+
+ final Optional<byte[]> key = readBytes(byteBuf);
+ if (key.isPresent()) {
+ final Optional<byte[]> value = readBytes(byteBuf);
+ mapCacheRequest = value.map(valueBytes -> new MapCacheRequest(cacheOperation, key.get(), valueBytes)).orElse(null);
+ } else {
+ mapCacheRequest = null;
+ }
+
+ return mapCacheRequest;
+ }
+
+ private MapCacheRequest readKeyRevisionValueRequest(final CacheOperation cacheOperation, final ByteBuf byteBuf) {
+ final MapCacheRequest mapCacheRequest;
+
+ final Optional<byte[]> key = readBytes(byteBuf);
+ if (key.isPresent()) {
+ final OptionalLong revision = readLong(byteBuf);
+ if (revision.isPresent()) {
+ final Optional<byte[]> value = readBytes(byteBuf);
+ mapCacheRequest = value.map(valueBytes -> new MapCacheRequest(cacheOperation, key.get(), revision.getAsLong(), valueBytes)).orElse(null);
+ } else {
+ mapCacheRequest = null;
+ }
+ } else {
+ mapCacheRequest = null;
+ }
+
+ return mapCacheRequest;
+ }
+
+ private MapCacheRequest readPatternRequest(final CacheOperation cacheOperation, final ByteBuf byteBuf) {
+ final Optional<String> pattern = readUnicodeString(byteBuf);
+ final Optional<MapCacheRequest> request = pattern.map(requestedPattern -> new MapCacheRequest(cacheOperation, requestedPattern));
+ return request.orElse(null);
+ }
+
+ private MapCacheRequest readSubMapRequest(final CacheOperation cacheOperation, final ByteBuf byteBuf) {
+ final MapCacheRequest mapCacheRequest;
+
+ final OptionalInt keys = readInt(byteBuf);
+ if (keys.isPresent()) {
+ final List<byte[]> subMapKeys = new ArrayList<>();
+ for (int i = 0; i < keys.getAsInt(); i++) {
+ final Optional<byte[]> key = readBytes(byteBuf);
+ if (key.isPresent()) {
+ subMapKeys.add(key.get());
+ } else {
+ // Clear Map to return null and retry on subsequent invocations
+ subMapKeys.clear();
+ break;
+ }
+
+ }
+
+ mapCacheRequest = subMapKeys.isEmpty() ? null : new MapCacheRequest(cacheOperation, subMapKeys);
+ } else {
+ mapCacheRequest = null;
+ }
+
+ return mapCacheRequest;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapCacheRequestHandler.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapCacheRequestHandler.java
new file mode 100644
index 0000000000..2674962d1d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapCacheRequestHandler.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.codec;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.nifi.distributed.cache.operations.CacheOperation;
+import org.apache.nifi.distributed.cache.operations.MapOperation;
+import org.apache.nifi.distributed.cache.server.map.MapCache;
+import org.apache.nifi.distributed.cache.server.map.MapCacheRecord;
+import org.apache.nifi.distributed.cache.server.map.MapPutResult;
+import org.apache.nifi.distributed.cache.server.protocol.CacheOperationResult;
+import org.apache.nifi.distributed.cache.server.protocol.MapCacheRequest;
+import org.apache.nifi.distributed.cache.server.protocol.MapRemoveResponse;
+import org.apache.nifi.distributed.cache.server.protocol.MapSizeResponse;
+import org.apache.nifi.distributed.cache.server.protocol.MapValueResponse;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Handler for Map Cache Request operations interacts with the Map Cache and writes Results
+ */
+@ChannelHandler.Sharable
+public class MapCacheRequestHandler extends SimpleChannelInboundHandler<MapCacheRequest> {
+ private static final long REVISION_NOT_FOUND = -1;
+
+ private final ComponentLog log;
+
+ private final MapCache mapCache;
+
+ public MapCacheRequestHandler(
+ final ComponentLog log,
+ final MapCache mapCache
+ ) {
+ this.log = Objects.requireNonNull(log, "Component Log required");
+ this.mapCache = Objects.requireNonNull(mapCache, "Map Cache required");
+ }
+
+ @Override
+ protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final MapCacheRequest mapCacheRequest) throws Exception {
+ final CacheOperation cacheOperation = mapCacheRequest.getCacheOperation();
+
+ if (MapOperation.CLOSE == cacheOperation) {
+ log.debug("Map Cache Operation [{}] received", cacheOperation);
+ channelHandlerContext.close();
+ } else if (MapOperation.CONTAINS_KEY == cacheOperation) {
+ final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey());
+ final boolean success = mapCache.containsKey(key);
+ writeResult(channelHandlerContext, cacheOperation, success);
+ } else if (MapOperation.GET == cacheOperation) {
+ final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey());
+ final ByteBuffer cached = mapCache.get(key);
+ writeBytes(channelHandlerContext, cacheOperation, cached);
+ } else if (MapOperation.GET_AND_PUT_IF_ABSENT == cacheOperation) {
+ final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey());
+ final ByteBuffer value = ByteBuffer.wrap(mapCacheRequest.getValue());
+ final MapPutResult result = mapCache.putIfAbsent(key, value);
+ final ByteBuffer cached = result.isSuccessful() ? null : result.getExisting().getValue();
+ writeBytes(channelHandlerContext, cacheOperation, cached);
+ } else if (MapOperation.FETCH == cacheOperation) {
+ final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey());
+ final MapCacheRecord mapCacheRecord = mapCache.fetch(key);
+ writeMapCacheRecord(channelHandlerContext, cacheOperation, mapCacheRecord);
+ } else if (MapOperation.KEYSET == cacheOperation) {
+ final Set<ByteBuffer> keySet = mapCache.keySet();
+ writeSize(channelHandlerContext, cacheOperation, keySet.size());
+ for (final ByteBuffer key : keySet) {
+ writeBytes(channelHandlerContext, cacheOperation, key);
+ }
+ } else if (MapOperation.PUT == cacheOperation) {
+ final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey());
+ final ByteBuffer value = ByteBuffer.wrap(mapCacheRequest.getValue());
+ final MapPutResult result = mapCache.put(key, value);
+ writeResult(channelHandlerContext, cacheOperation, result.isSuccessful());
+ } else if (MapOperation.PUT_IF_ABSENT == cacheOperation) {
+ final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey());
+ final ByteBuffer value = ByteBuffer.wrap(mapCacheRequest.getValue());
+ final MapPutResult result = mapCache.putIfAbsent(key, value);
+ writeResult(channelHandlerContext, cacheOperation, result.isSuccessful());
+ } else if (MapOperation.REMOVE == cacheOperation) {
+ final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey());
+ final ByteBuffer removed = mapCache.remove(key);
+ final boolean success = removed != null;
+ writeResult(channelHandlerContext, cacheOperation, success);
+ } else if (MapOperation.REMOVE_AND_GET == cacheOperation) {
+ final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey());
+ final ByteBuffer removed = mapCache.remove(key);
+ writeBytes(channelHandlerContext, cacheOperation, removed);
+ } else if (MapOperation.REMOVE_BY_PATTERN == cacheOperation) {
+ final String pattern = mapCacheRequest.getPattern();
+ final Map<ByteBuffer, ByteBuffer> removed = mapCache.removeByPattern(pattern);
+ final int size = removed == null ? 0 : removed.size();
+ writeRemoved(channelHandlerContext, cacheOperation, size);
+ } else if (MapOperation.REMOVE_BY_PATTERN_AND_GET == cacheOperation) {
+ final String pattern = mapCacheRequest.getPattern();
+ final Map<ByteBuffer, ByteBuffer> removed = mapCache.removeByPattern(pattern);
+ if (removed == null) {
+ writeRemoved(channelHandlerContext, cacheOperation, 0);
+ } else {
+ writeSize(channelHandlerContext, cacheOperation, removed.size());
+ for (final Map.Entry<ByteBuffer, ByteBuffer> entry : removed.entrySet()) {
+ writeBytes(channelHandlerContext, cacheOperation, entry.getKey());
+ writeBytes(channelHandlerContext, cacheOperation, entry.getValue());
+ }
+ }
+ } else if (MapOperation.REPLACE == cacheOperation) {
+ final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey());
+ final ByteBuffer value = ByteBuffer.wrap(mapCacheRequest.getValue());
+ final MapCacheRecord mapCacheRecord = new MapCacheRecord(key, value, mapCacheRequest.getRevision());
+ final MapPutResult result = mapCache.replace(mapCacheRecord);
+ writeResult(channelHandlerContext, cacheOperation, result.isSuccessful());
+ } else if (MapOperation.SUBMAP == cacheOperation) {
+ final List<byte[]> keys = mapCacheRequest.getKeys();
+ for (final byte[] key : keys) {
+ final ByteBuffer requestedKey = ByteBuffer.wrap(key);
+ final ByteBuffer value = mapCache.get(requestedKey);
+ writeBytes(channelHandlerContext, cacheOperation, value);
+ }
+ } else {
+ log.warn("Map Cache Operation [{}] not supported", cacheOperation);
+ }
+ }
+
+ private void writeResult(final ChannelHandlerContext channelHandlerContext, final CacheOperation cacheOperation, final boolean success) {
+ log.debug("Map Cache Operation [{}] Success [{}]", cacheOperation, success);
+ final CacheOperationResult cacheOperationResult = new CacheOperationResult(success);
+ channelHandlerContext.writeAndFlush(cacheOperationResult);
+ }
+
+ private void writeRemoved(final ChannelHandlerContext channelHandlerContext, final CacheOperation cacheOperation, final long size) {
+ final MapRemoveResponse mapRemoveResponse = new MapRemoveResponse(size);
+ log.debug("Map Cache Operation [{}] Size [{}]", cacheOperation, size);
+ channelHandlerContext.writeAndFlush(mapRemoveResponse);
+ }
+
+ private void writeSize(final ChannelHandlerContext channelHandlerContext, final CacheOperation cacheOperation, final int size) {
+ final MapSizeResponse mapSizeResponse = new MapSizeResponse(size);
+ log.debug("Map Cache Operation [{}] Size [{}]", cacheOperation, size);
+ channelHandlerContext.writeAndFlush(mapSizeResponse);
+ }
+
+ private void writeBytes(final ChannelHandlerContext channelHandlerContext, final CacheOperation cacheOperation, final ByteBuffer buffer) {
+ final byte[] bytes = buffer == null ? null : buffer.array();
+ final int length = bytes == null ? 0 : bytes.length;
+ final MapValueResponse mapValueResponse = new MapValueResponse(length, bytes);
+ log.debug("Map Cache Operation [{}] Length [{}]", cacheOperation, length);
+ channelHandlerContext.writeAndFlush(mapValueResponse);
+ }
+
+ private void writeMapCacheRecord(final ChannelHandlerContext channelHandlerContext, final CacheOperation cacheOperation, final MapCacheRecord mapCacheRecord) {
+ final long revision = mapCacheRecord == null ? REVISION_NOT_FOUND : mapCacheRecord.getRevision();
+ final byte[] value = mapCacheRecord == null ? null : mapCacheRecord.getValue().array();
+ final int length = value == null ? 0 : value.length;
+ final MapValueResponse mapValueResponse = new MapValueResponse(length, value, revision);
+ log.debug("Map Cache Operation [{}] Length [{}]", cacheOperation, length);
+ channelHandlerContext.writeAndFlush(mapValueResponse);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapRemoveResponseEncoder.java
similarity index 54%
copy from nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
copy to nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapRemoveResponseEncoder.java
index ce6718f6f5..44d7ce26d9 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapRemoveResponseEncoder.java
@@ -14,24 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.distributed.cache.operations;
+package org.apache.nifi.distributed.cache.server.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.nifi.distributed.cache.server.protocol.MapRemoveResponse;
/**
- * Represents a distributed set cache operation which may be invoked.
+ * Message Encoder for Map Remove Responses
*/
-public enum SetOperation {
- ADD_IF_ABSENT("addIfAbsent"),
- CONTAINS("contains"),
- REMOVE("remove"),
- CLOSE("close");
-
- private final String operation;
-
- SetOperation(final String operation) {
- this.operation = operation;
- }
-
- public String value() {
- return operation;
+@ChannelHandler.Sharable
+public class MapRemoveResponseEncoder extends MessageToByteEncoder<MapRemoveResponse> {
+ @Override
+ protected void encode(final ChannelHandlerContext channelHandlerContext, final MapRemoveResponse mapRemoveResponse, final ByteBuf byteBuf) {
+ byteBuf.writeLong(mapRemoveResponse.getSize());
}
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapSizeResponseEncoder.java
similarity index 55%
copy from nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
copy to nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapSizeResponseEncoder.java
index ce6718f6f5..d6fa60dd9c 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapSizeResponseEncoder.java
@@ -14,24 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.distributed.cache.operations;
+package org.apache.nifi.distributed.cache.server.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.nifi.distributed.cache.server.protocol.MapSizeResponse;
/**
- * Represents a distributed set cache operation which may be invoked.
+ * Message Encoder for Map Size Responses
*/
-public enum SetOperation {
- ADD_IF_ABSENT("addIfAbsent"),
- CONTAINS("contains"),
- REMOVE("remove"),
- CLOSE("close");
-
- private final String operation;
-
- SetOperation(final String operation) {
- this.operation = operation;
- }
-
- public String value() {
- return operation;
+@ChannelHandler.Sharable
+public class MapSizeResponseEncoder extends MessageToByteEncoder<MapSizeResponse> {
+ @Override
+ protected void encode(final ChannelHandlerContext channelHandlerContext, final MapSizeResponse mapSizeResponse, final ByteBuf byteBuf) {
+ byteBuf.writeInt(mapSizeResponse.getSize());
}
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapValueResponseEncoder.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapValueResponseEncoder.java
new file mode 100644
index 0000000000..a0561a31bc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapValueResponseEncoder.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.nifi.distributed.cache.server.protocol.MapValueResponse;
+
+/**
+ * Message Encoder for Map Value Responses
+ */
+@ChannelHandler.Sharable
+public class MapValueResponseEncoder extends MessageToByteEncoder<MapValueResponse> {
+ @Override
+ protected void encode(final ChannelHandlerContext channelHandlerContext, final MapValueResponse mapValueResponse, final ByteBuf byteBuf) {
+ final Long revision = mapValueResponse.getRevision();
+ if (revision != null) {
+ byteBuf.writeLong(revision);
+ }
+ byteBuf.writeInt(mapValueResponse.getLength());
+
+ final byte[] value = mapValueResponse.getValue();
+ if (value != null) {
+ byteBuf.writeBytes(value);
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/SetCacheRequestHandler.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/SetCacheRequestHandler.java
new file mode 100644
index 0000000000..0b0987778d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/SetCacheRequestHandler.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.codec;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.nifi.distributed.cache.operations.CacheOperation;
+import org.apache.nifi.distributed.cache.operations.SetOperation;
+import org.apache.nifi.distributed.cache.server.protocol.CacheOperationResult;
+import org.apache.nifi.distributed.cache.server.protocol.CacheRequest;
+import org.apache.nifi.distributed.cache.server.set.SetCache;
+import org.apache.nifi.distributed.cache.server.set.SetCacheResult;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+/**
+ * Handler for Set Cache Request operations interacts with the Set Cache and writes Results
+ */
+@ChannelHandler.Sharable
+public class SetCacheRequestHandler extends SimpleChannelInboundHandler<CacheRequest> {
+ private final ComponentLog log;
+
+ private final SetCache setCache;
+
+ public SetCacheRequestHandler(
+ final ComponentLog log,
+ final SetCache setCache
+ ) {
+ this.log = Objects.requireNonNull(log, "Component Log required");
+ this.setCache = Objects.requireNonNull(setCache, "Set Cache required");
+ }
+
+ @Override
+ protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final CacheRequest cacheRequest) throws Exception {
+ final CacheOperation cacheOperation = cacheRequest.getCacheOperation();
+ final ByteBuffer body = ByteBuffer.wrap(cacheRequest.getBody());
+
+ final CacheOperationResult result;
+ if (SetOperation.ADD_IF_ABSENT == cacheOperation) {
+ result = getCacheOperationResult(setCache.addIfAbsent(body));
+ } else if (SetOperation.CONTAINS == cacheOperation) {
+ result = getCacheOperationResult(setCache.contains(body));
+ } else if (SetOperation.REMOVE == cacheOperation) {
+ result = getCacheOperationResult(setCache.remove(body));
+ } else {
+ result = null;
+ }
+
+ if (SetOperation.CLOSE == cacheOperation) {
+ log.debug("Set Cache Operation [{}] received", cacheOperation);
+ channelHandlerContext.close();
+ } else if (result == null) {
+ log.warn("Set Cache Operation [{}] not supported", cacheOperation);
+ } else {
+ log.debug("Set Cache Operation [{}] Success [{}]", cacheOperation, result.isSuccess());
+ channelHandlerContext.writeAndFlush(result);
+ }
+ }
+
+ private CacheOperationResult getCacheOperationResult(final SetCacheResult setCacheResult) {
+ return new CacheOperationResult(setCacheResult.getResult());
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
index 7987e1485c..8c3a83a485 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
@@ -75,10 +75,23 @@ public class DistributedMapCacheServer extends DistributedCacheServer {
}
}
- protected MapCacheServer createMapCacheServer(
- final int port, final int maxSize, final SSLContext sslContext, final EvictionPolicy evictionPolicy,
- final File persistenceDir, final int maxReadSize) throws IOException {
- return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir, maxReadSize);
+ protected CacheServer createMapCacheServer(
+ final int port,
+ final int maxSize,
+ final SSLContext sslContext,
+ final EvictionPolicy evictionPolicy,
+ final File persistenceDir,
+ final int maxReadSize
+ ) throws IOException {
+ return new StandardMapCacheServer(
+ getLogger(),
+ getIdentifier(),
+ sslContext,
+ port,
+ maxSize,
+ evictionPolicy,
+ persistenceDir,
+ maxReadSize
+ );
}
-
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
deleted file mode 100644
index 8986183540..0000000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server.map;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.Set;
-
-import javax.net.ssl.SSLContext;
-
-import org.apache.nifi.distributed.cache.server.AbstractCacheServer;
-import org.apache.nifi.distributed.cache.server.EvictionPolicy;
-import org.apache.nifi.remote.StandardVersionNegotiator;
-import org.apache.nifi.remote.VersionNegotiator;
-
-public class MapCacheServer extends AbstractCacheServer {
-
- private final MapCache cache;
-
- public MapCacheServer(final String identifier, final SSLContext sslContext, final int port, final int maxSize,
- final EvictionPolicy evictionPolicy, final File persistencePath, final int maxReadSize) throws IOException {
- super(identifier, sslContext, port, maxReadSize);
-
- final MapCache simpleCache = new SimpleMapCache(identifier, maxSize, evictionPolicy);
-
- if (persistencePath == null) {
- this.cache = simpleCache;
- } else {
- final PersistentMapCache persistentCache = new PersistentMapCache(identifier, persistencePath, simpleCache);
- persistentCache.restore();
- this.cache = persistentCache;
- }
- }
-
- /**
- * Refer {@link org.apache.nifi.distributed.cache.protocol.ProtocolHandshake#initiateHandshake(InputStream, OutputStream, VersionNegotiator)}
- * for details of each version enhancements.
- */
- protected StandardVersionNegotiator getVersionNegotiator() {
- return new StandardVersionNegotiator(3, 2, 1);
- }
-
- @Override
- protected boolean listen(final InputStream in, final OutputStream out, final int version) throws IOException {
- final DataInputStream dis = new DataInputStream(in);
- final DataOutputStream dos = new DataOutputStream(out);
- final String action = dis.readUTF();
- try {
- switch (action) {
- case "close": {
- return false;
- }
- case "putIfAbsent": {
- final byte[] key = readValue(dis);
- final byte[] value = readValue(dis);
- final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
- dos.writeBoolean(putResult.isSuccessful());
- break;
- }
- case "put": {
- final byte[] key = readValue(dis);
- final byte[] value = readValue(dis);
- cache.put(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
- dos.writeBoolean(true);
- break;
- }
- case "containsKey": {
- final byte[] key = readValue(dis);
- final boolean contains = cache.containsKey(ByteBuffer.wrap(key));
- dos.writeBoolean(contains);
- break;
- }
- case "getAndPutIfAbsent": {
- final byte[] key = readValue(dis);
- final byte[] value = readValue(dis);
-
- final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
- if (putResult.isSuccessful()) {
- // Put was successful. There was no old value to get.
- dos.writeInt(0);
- } else {
- // we didn't put. Write back the previous value
- final byte[] byteArray = putResult.getExisting().getValue().array();
- dos.writeInt(byteArray.length);
- dos.write(byteArray);
- }
-
- break;
- }
- case "get": {
- final byte[] key = readValue(dis);
- final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key));
- if (existingValue == null) {
- // there was no existing value.
- dos.writeInt(0);
- } else {
- // a value already existed.
- final byte[] byteArray = existingValue.array();
- dos.writeInt(byteArray.length);
- dos.write(byteArray);
- }
-
- break;
- }
- case "subMap": {
- final int numKeys = validateSize(dis.readInt());
- for(int i=0;i<numKeys;i++) {
- final byte[] key = readValue(dis);
- final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key));
- if (existingValue == null) {
- // there was no existing value.
- dos.writeInt(0);
- } else {
- // a value already existed.
- final byte[] byteArray = existingValue.array();
- dos.writeInt(byteArray.length);
- dos.write(byteArray);
- }
- }
- break;
- }
- case "remove": {
- final byte[] key = readValue(dis);
- final boolean removed = cache.remove(ByteBuffer.wrap(key)) != null;
- dos.writeBoolean(removed);
- break;
- }
- case "removeAndGet": {
- final byte[] key = readValue(dis);
- final ByteBuffer removed = cache.remove(ByteBuffer.wrap(key));
- if (removed == null) {
- // there was no value removed
- dos.writeInt(0);
- } else {
- // reply with the value that was removed
- final byte[] byteArray = removed.array();
- dos.writeInt(byteArray.length);
- dos.write(byteArray);
- }
- break;
- }
- case "removeByPattern": {
- final String pattern = dis.readUTF();
- final Map<ByteBuffer, ByteBuffer> removed = cache.removeByPattern(pattern);
- dos.writeLong(removed == null ? 0 : removed.size());
- break;
- }
- case "removeByPatternAndGet": {
- final String pattern = dis.readUTF();
- final Map<ByteBuffer, ByteBuffer> removed = cache.removeByPattern(pattern);
- if (removed == null || removed.size() == 0) {
- dos.writeLong(0);
- } else {
- // write the map size
- dos.writeInt(removed.size());
- for (Map.Entry<ByteBuffer, ByteBuffer> entry : removed.entrySet()) {
- // write map entry key
- final byte[] key = entry.getKey().array();
- dos.writeInt(key.length);
- dos.write(key);
- // write map entry value
- final byte[] value = entry.getValue().array();
- dos.writeInt(value.length);
- dos.write(value);
- }
- }
- break;
- }
- case "fetch": {
- final byte[] key = readValue(dis);
- final MapCacheRecord existing = cache.fetch(ByteBuffer.wrap(key));
- if (existing == null) {
- // there was no existing value.
- dos.writeLong(-1);
- dos.writeInt(0);
- } else {
- // a value already existed.
- dos.writeLong(existing.getRevision());
- final byte[] byteArray = existing.getValue().array();
- dos.writeInt(byteArray.length);
- dos.write(byteArray);
- }
-
- break;
- }
- case "replace": {
- final byte[] key = readValue(dis);
- final long revision = dis.readLong();
- final byte[] value = readValue(dis);
- final MapPutResult result = cache.replace(new MapCacheRecord(ByteBuffer.wrap(key), ByteBuffer.wrap(value), revision));
- dos.writeBoolean(result.isSuccessful());
- break;
- }
- case "keySet": {
- final Set<ByteBuffer> result = cache.keySet();
- // write the set size
- dos.writeInt(result.size());
- // write each key in the set
- for (ByteBuffer bb : result) {
- final byte[] byteArray = bb.array();
- dos.writeInt(byteArray.length);
- dos.write(byteArray);
- }
- break;
- }
- default: {
- throw new IOException("Illegal Request");
- }
- }
- } finally {
- dos.flush();
- }
-
- return true;
- }
-
- @Override
- public void stop() throws IOException {
- try {
- super.stop();
- } finally {
- cache.shutdown();
- }
- }
-
- @Override
- protected void finalize() throws Throwable {
- if (!stopped) {
- stop();
- }
- }
-}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/StandardMapCacheServer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/StandardMapCacheServer.java
new file mode 100644
index 0000000000..f5e57fd5fe
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/StandardMapCacheServer.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.map;
+
+import org.apache.nifi.distributed.cache.operations.MapOperation;
+import org.apache.nifi.distributed.cache.protocol.ProtocolVersion;
+import org.apache.nifi.distributed.cache.server.EventCacheServer;
+import org.apache.nifi.distributed.cache.server.EvictionPolicy;
+import org.apache.nifi.distributed.cache.server.codec.CacheVersionRequestHandler;
+import org.apache.nifi.distributed.cache.server.codec.CacheVersionResponseEncoder;
+import org.apache.nifi.distributed.cache.server.codec.CacheOperationResultEncoder;
+import org.apache.nifi.distributed.cache.server.codec.MapCacheRequestDecoder;
+import org.apache.nifi.distributed.cache.server.codec.MapCacheRequestHandler;
+import org.apache.nifi.distributed.cache.server.codec.MapRemoveResponseEncoder;
+import org.apache.nifi.distributed.cache.server.codec.MapSizeResponseEncoder;
+import org.apache.nifi.distributed.cache.server.codec.MapValueResponseEncoder;
+import org.apache.nifi.event.transport.EventServer;
+import org.apache.nifi.event.transport.EventServerFactory;
+import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.VersionNegotiator;
+
+import javax.net.ssl.SSLContext;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+
+/**
+ * Standard Map Cache Server implemented using Netty
+ */
+public class StandardMapCacheServer extends EventCacheServer {
+ private final EventServerFactory eventServerFactory;
+
+ private final MapCache cache;
+
+ public StandardMapCacheServer(
+ final ComponentLog log,
+ final String identifier,
+ final SSLContext sslContext,
+ final int port,
+ final int maxCacheEntries,
+ final EvictionPolicy evictionPolicy,
+ final File persistencePath,
+ final int maxReadLength
+ ) throws IOException {
+ super(log, port);
+
+ final MapCache simpleCache = new SimpleMapCache(identifier, maxCacheEntries, evictionPolicy);
+
+ if (persistencePath == null) {
+ this.cache = simpleCache;
+ } else {
+ final PersistentMapCache persistentCache = new PersistentMapCache(identifier, persistencePath, simpleCache);
+ persistentCache.restore();
+ this.cache = persistentCache;
+ }
+
+ final NettyEventServerFactory nettyEventServerFactory = createEventServerFactory(identifier, sslContext);
+
+ // Create Sharable Handlers to avoid unnecessary instantiation for each connection
+ final MapCacheRequestHandler mapCacheRequestHandler = new MapCacheRequestHandler(log, cache);
+ final CacheVersionResponseEncoder cacheVersionResponseEncoder = new CacheVersionResponseEncoder();
+ final CacheOperationResultEncoder cacheOperationResultEncoder = new CacheOperationResultEncoder();
+ final MapRemoveResponseEncoder mapRemoveResponseEncoder = new MapRemoveResponseEncoder();
+ final MapSizeResponseEncoder mapSizeResponseEncoder = new MapSizeResponseEncoder();
+ final MapValueResponseEncoder mapValueResponseEncoder = new MapValueResponseEncoder();
+
+ final VersionNegotiator versionNegotiator = createVersionNegotiator();
+ nettyEventServerFactory.setHandlerSupplier(() ->
+ Arrays.asList(
+ cacheVersionResponseEncoder,
+ cacheOperationResultEncoder,
+ mapRemoveResponseEncoder,
+ mapSizeResponseEncoder,
+ mapValueResponseEncoder,
+ new MapCacheRequestDecoder(log, maxReadLength, MapOperation.values()),
+ mapCacheRequestHandler,
+ new CacheVersionRequestHandler(log, versionNegotiator)
+ )
+ );
+
+ this.eventServerFactory = nettyEventServerFactory;
+ }
+
+ @Override
+ public void stop() {
+ try {
+ cache.shutdown();
+ } catch (final IOException e) {
+ throw new UncheckedIOException("Cache Shutdown Failed", e);
+ } finally {
+ super.stop();
+ }
+ }
+
+ @Override
+ protected EventServer createEventServer() {
+ return eventServerFactory.getEventServer();
+ }
+
+ protected VersionNegotiator createVersionNegotiator() {
+ return new StandardVersionNegotiator(
+ ProtocolVersion.V3.value(),
+ ProtocolVersion.V2.value(),
+ ProtocolVersion.V1.value()
+ );
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/CacheOperationResult.java
similarity index 65%
copy from nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
copy to nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/CacheOperationResult.java
index ce6718f6f5..770deed4f0 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/CacheOperationResult.java
@@ -14,24 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.distributed.cache.operations;
+package org.apache.nifi.distributed.cache.server.protocol;
-/**
- * Represents a distributed set cache operation which may be invoked.
- */
-public enum SetOperation {
- ADD_IF_ABSENT("addIfAbsent"),
- CONTAINS("contains"),
- REMOVE("remove"),
- CLOSE("close");
-
- private final String operation;
+public class CacheOperationResult {
+ private final boolean success;
- SetOperation(final String operation) {
- this.operation = operation;
+ public CacheOperationResult(
+ final boolean success
+ ) {
+ this.success = success;
}
- public String value() {
- return operation;
+ public boolean isSuccess() {
+ return success;
}
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/CacheRequest.java
similarity index 54%
copy from nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
copy to nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/CacheRequest.java
index ce6718f6f5..9bd6793822 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/CacheRequest.java
@@ -14,24 +14,33 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.distributed.cache.operations;
+package org.apache.nifi.distributed.cache.server.protocol;
+
+import org.apache.nifi.distributed.cache.operations.CacheOperation;
+
+import java.util.Objects;
/**
- * Represents a distributed set cache operation which may be invoked.
+ * Cache Request Packet
*/
-public enum SetOperation {
- ADD_IF_ABSENT("addIfAbsent"),
- CONTAINS("contains"),
- REMOVE("remove"),
- CLOSE("close");
+public class CacheRequest {
+ private final CacheOperation cacheOperation;
- private final String operation;
+ private final byte[] body;
+
+ public CacheRequest(
+ final CacheOperation cacheOperation,
+ final byte[] body
+ ) {
+ this.cacheOperation = Objects.requireNonNull(cacheOperation, "Cache Operation required");
+ this.body = Objects.requireNonNull(body, "Body required");
+ }
- SetOperation(final String operation) {
- this.operation = operation;
+ public CacheOperation getCacheOperation() {
+ return cacheOperation;
}
- public String value() {
- return operation;
+ public byte[] getBody() {
+ return body;
}
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/CacheVersionRequest.java
similarity index 66%
copy from nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
copy to nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/CacheVersionRequest.java
index ce6718f6f5..9c69b573d0 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/CacheVersionRequest.java
@@ -14,24 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.distributed.cache.operations;
+package org.apache.nifi.distributed.cache.server.protocol;
/**
- * Represents a distributed set cache operation which may be invoked.
+ * Cache Version Request contains the protocol version which the peer requested
*/
-public enum SetOperation {
- ADD_IF_ABSENT("addIfAbsent"),
- CONTAINS("contains"),
- REMOVE("remove"),
- CLOSE("close");
+public class CacheVersionRequest {
+ private final int version;
- private final String operation;
-
- SetOperation(final String operation) {
- this.operation = operation;
+ public CacheVersionRequest(
+ final int version
+ ) {
+ this.version = version;
}
- public String value() {
- return operation;
+ public int getVersion() {
+ return version;
}
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/CacheVersionResponse.java
similarity index 60%
copy from nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
copy to nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/CacheVersionResponse.java
index ce6718f6f5..724803c203 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/CacheVersionResponse.java
@@ -14,24 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.distributed.cache.operations;
+package org.apache.nifi.distributed.cache.server.protocol;
/**
- * Represents a distributed set cache operation which may be invoked.
+ * Cache Version Response contains the status code and optional requested version
*/
-public enum SetOperation {
- ADD_IF_ABSENT("addIfAbsent"),
- CONTAINS("contains"),
- REMOVE("remove"),
- CLOSE("close");
+public class CacheVersionResponse {
+ private final int statusCode;
- private final String operation;
+ private final int version;
- SetOperation(final String operation) {
- this.operation = operation;
+ public CacheVersionResponse(
+ final int statusCode,
+ final int version
+ ) {
+ this.statusCode = statusCode;
+ this.version = version;
}
- public String value() {
- return operation;
+ public int getStatusCode() {
+ return statusCode;
+ }
+
+ public int getVersion() {
+ return version;
}
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/MapCacheRequest.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/MapCacheRequest.java
new file mode 100644
index 0000000000..0f6fe97543
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/MapCacheRequest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.protocol;
+
+import org.apache.nifi.distributed.cache.operations.CacheOperation;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Map Cache Request with operation and other optional properties
+ */
+public class MapCacheRequest {
+ private final CacheOperation cacheOperation;
+
+ private byte[] key;
+
+ private byte[] value;
+
+ private String pattern;
+
+ private long revision;
+
+ private List<byte[]> keys = Collections.emptyList();
+
+ public MapCacheRequest(
+ final CacheOperation cacheOperation
+ ) {
+ this.cacheOperation = Objects.requireNonNull(cacheOperation, "Cache Operation required");
+ }
+
+ public MapCacheRequest(
+ final CacheOperation cacheOperation,
+ final byte[] key
+ ) {
+ this(cacheOperation);
+ this.key = Objects.requireNonNull(key, "Key required");
+ }
+
+ public MapCacheRequest(
+ final CacheOperation cacheOperation,
+ final byte[] key,
+ final byte[] value
+ ) {
+ this(cacheOperation, key);
+ this.value = Objects.requireNonNull(value, "Value required");
+ }
+
+ public MapCacheRequest(
+ final CacheOperation cacheOperation,
+ final byte[] key,
+ final long revision,
+ final byte[] value
+ ) {
+ this(cacheOperation, key, value);
+ this.revision = revision;
+ }
+
+ public MapCacheRequest(
+ final CacheOperation cacheOperation,
+ final String pattern
+ ) {
+ this(cacheOperation);
+ this.pattern = Objects.requireNonNull(pattern, "Pattern required");
+ }
+
+ public MapCacheRequest(
+ final CacheOperation cacheOperation,
+ final List<byte[]> keys
+ ) {
+ this(cacheOperation);
+ this.keys = Objects.requireNonNull(keys, "Keys required");
+ }
+
+ public CacheOperation getCacheOperation() {
+ return cacheOperation;
+ }
+
+ public byte[] getKey() {
+ return key;
+ }
+
+ public byte[] getValue() {
+ return value;
+ }
+
+ public String getPattern() {
+ return pattern;
+ }
+
+ public long getRevision() {
+ return revision;
+ }
+
+ public List<byte[]> getKeys() {
+ return keys;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/MapRemoveResponse.java
similarity index 66%
copy from nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
copy to nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/MapRemoveResponse.java
index ce6718f6f5..f5a265cb18 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/MapRemoveResponse.java
@@ -14,24 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.distributed.cache.operations;
+package org.apache.nifi.distributed.cache.server.protocol;
/**
- * Represents a distributed set cache operation which may be invoked.
+ * Map Remove Response
*/
-public enum SetOperation {
- ADD_IF_ABSENT("addIfAbsent"),
- CONTAINS("contains"),
- REMOVE("remove"),
- CLOSE("close");
+public class MapRemoveResponse {
+ private final long size;
- private final String operation;
-
- SetOperation(final String operation) {
- this.operation = operation;
+ public MapRemoveResponse(
+ final long size
+ ) {
+ this.size = size;
}
- public String value() {
- return operation;
+ public long getSize() {
+ return size;
}
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/MapSizeResponse.java
similarity index 66%
copy from nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
copy to nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/MapSizeResponse.java
index ce6718f6f5..9c124dc7a4 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/SetOperation.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/MapSizeResponse.java
@@ -14,24 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.distributed.cache.operations;
+package org.apache.nifi.distributed.cache.server.protocol;
/**
- * Represents a distributed set cache operation which may be invoked.
+ * Map Size Response
*/
-public enum SetOperation {
- ADD_IF_ABSENT("addIfAbsent"),
- CONTAINS("contains"),
- REMOVE("remove"),
- CLOSE("close");
+public class MapSizeResponse {
+ private final int size;
- private final String operation;
-
- SetOperation(final String operation) {
- this.operation = operation;
+ public MapSizeResponse(
+ final int size
+ ) {
+ this.size = size;
}
- public String value() {
- return operation;
+ public int getSize() {
+ return size;
}
}
diff --git a/nifi-commons/nifi-security-socket-ssl/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/MapValueResponse.java
similarity index 50%
rename from nifi-commons/nifi-security-socket-ssl/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
rename to nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/MapValueResponse.java
index 262cf54f61..6b52908f78 100644
--- a/nifi-commons/nifi-security-socket-ssl/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/protocol/MapValueResponse.java
@@ -14,39 +14,44 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.remote.io.socket.ssl;
+package org.apache.nifi.distributed.cache.server.protocol;
-import java.io.IOException;
-import java.io.OutputStream;
+/**
+ * Map Value Response
+ */
+public class MapValueResponse {
+ private final int length;
-public class SSLSocketChannelOutputStream extends OutputStream {
+ private final byte[] value;
- private final SSLSocketChannel channel;
+ private Long revision;
- public SSLSocketChannelOutputStream(final SSLSocketChannel channel) {
- this.channel = channel;
+ public MapValueResponse(
+ final int length,
+ final byte[] value
+ ) {
+ this.length = length;
+ this.value = value;
}
- @Override
- public void write(final int b) throws IOException {
- channel.write(b);
+ public MapValueResponse(
+ final int length,
+ final byte[] value,
+ final Long revision
+ ) {
+ this(length, value);
+ this.revision = revision;
}
- @Override
- public void write(byte[] b) throws IOException {
- channel.write(b);
+ public int getLength() {
+ return length;
}
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- channel.write(b, off, len);
+ public byte[] getValue() {
+ return value;
}
- /**
- * Closes the underlying SSLSocketChannel, which also will close the InputStream and the connection
- */
- @Override
- public void close() throws IOException {
- channel.close();
+ public Long getRevision() {
+ return revision;
}
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/StandardSetCacheServer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/StandardSetCacheServer.java
new file mode 100644
index 0000000000..d11b941f60
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/StandardSetCacheServer.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.set;
+
+import org.apache.nifi.distributed.cache.operations.SetOperation;
+import org.apache.nifi.distributed.cache.protocol.ProtocolVersion;
+import org.apache.nifi.distributed.cache.server.EventCacheServer;
+import org.apache.nifi.distributed.cache.server.EvictionPolicy;
+import org.apache.nifi.distributed.cache.server.codec.CacheOperationResultEncoder;
+import org.apache.nifi.distributed.cache.server.codec.CacheRequestDecoder;
+import org.apache.nifi.distributed.cache.server.codec.CacheVersionRequestHandler;
+import org.apache.nifi.distributed.cache.server.codec.CacheVersionResponseEncoder;
+import org.apache.nifi.distributed.cache.server.codec.SetCacheRequestHandler;
+import org.apache.nifi.event.transport.EventServer;
+import org.apache.nifi.event.transport.EventServerFactory;
+import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.VersionNegotiator;
+
+import javax.net.ssl.SSLContext;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+
+/**
+ * Standard Set Cache Server implementation based on Netty
+ */
+public class StandardSetCacheServer extends EventCacheServer {
+ private final EventServerFactory eventServerFactory;
+
+ private final SetCache cache;
+
+ public StandardSetCacheServer(
+ final ComponentLog log,
+ final String identifier,
+ final SSLContext sslContext,
+ final int port,
+ final int maxCacheEntries,
+ final EvictionPolicy evictionPolicy,
+ final File persistencePath,
+ final int maxReadLength
+ ) throws IOException {
+ super(log, port);
+
+ final SetCache simpleCache = new SimpleSetCache(identifier, maxCacheEntries, evictionPolicy);
+
+ if (persistencePath == null) {
+ this.cache = simpleCache;
+ } else {
+ final PersistentSetCache persistentCache = new PersistentSetCache(identifier, persistencePath, simpleCache);
+ persistentCache.restore();
+ this.cache = persistentCache;
+ }
+
+ final NettyEventServerFactory nettyEventServerFactory = createEventServerFactory(identifier, sslContext);
+
+ // Create Sharable Handlers to avoid unnecessary instantiation for each connection
+ final SetCacheRequestHandler setCacheRequestHandler = new SetCacheRequestHandler(log, cache);
+ final CacheVersionResponseEncoder cacheVersionResponseEncoder = new CacheVersionResponseEncoder();
+ final CacheOperationResultEncoder cacheOperationResultEncoder = new CacheOperationResultEncoder();
+
+ final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(ProtocolVersion.V1.value());
+ nettyEventServerFactory.setHandlerSupplier(() ->
+ Arrays.asList(
+ cacheVersionResponseEncoder,
+ cacheOperationResultEncoder,
+ new CacheRequestDecoder(log, maxReadLength, SetOperation.values()),
+ setCacheRequestHandler,
+ new CacheVersionRequestHandler(log, versionNegotiator)
+ )
+ );
+
+ this.eventServerFactory = nettyEventServerFactory;
+ }
+
+ @Override
+ public void stop() {
+ try {
+ cache.shutdown();
+ } catch (final IOException e) {
+ throw new UncheckedIOException("Cache Shutdown Failed", e);
+ } finally {
+ super.stop();
+ }
+ }
+
+ @Override
+ protected EventServer createEventServer() {
+ return eventServerFactory.getEventServer();
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTest.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTest.java
index 42e9f568dc..d292159425 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTest.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTest.java
@@ -26,9 +26,10 @@ import org.apache.nifi.processor.Processor;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;
import java.io.IOException;
@@ -39,18 +40,12 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Verify basic functionality of {@link DistributedMapCacheClientService}.
- * <p>
- * This test instantiates both the server and client {@link org.apache.nifi.controller.ControllerService} objects
- * implementing the distributed cache protocol. It assumes that the default distributed cache port (4557)
- * is available.
- */
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Timeout(5)
public class DistributedMapCacheTest {
private static TestRunner runner = null;
@@ -59,8 +54,8 @@ public class DistributedMapCacheTest {
private static final Serializer<String> serializer = new StringSerializer();
private static final Deserializer<String> deserializer = new StringDeserializer();
- @BeforeClass
- public static void beforeClass() throws Exception {
+ @BeforeAll
+ public static void startServices() throws Exception {
final String port = Integer.toString(NetworkUtils.getAvailableTcpPort());
runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
@@ -76,8 +71,8 @@ public class DistributedMapCacheTest {
runner.enableControllerService(client);
}
- @AfterClass
- public static void afterClass() {
+ @AfterAll
+ public static void shutdownServices() {
runner.disableControllerService(client);
runner.removeControllerService(client);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTlsTest.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTlsTest.java
index 9fddfaf97b..654468f817 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTlsTest.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTlsTest.java
@@ -29,9 +29,9 @@ import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import javax.net.ssl.SSLContext;
@@ -40,18 +40,10 @@ import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
-/**
- * Verify basic functionality of {@link DistributedMapCacheClientService}, in the context of a TLS authenticated
- * socket session.
- * <p>
- * This test instantiates both the server and client {@link org.apache.nifi.controller.ControllerService} objects
- * implementing the distributed cache protocol. It assumes that the default distributed cache port (4557)
- * is available.
- */
public class DistributedMapCacheTlsTest {
private static TestRunner runner = null;
@@ -61,8 +53,8 @@ public class DistributedMapCacheTlsTest {
private static final Serializer<String> serializer = new StringSerializer();
private static final Deserializer<String> deserializer = new StringDeserializer();
- @BeforeClass
- public static void beforeClass() throws Exception {
+ @BeforeAll
+ public static void setServices() throws Exception {
final String port = Integer.toString(NetworkUtils.getAvailableTcpPort());
runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
sslContextService = createSslContextService();
@@ -83,8 +75,8 @@ public class DistributedMapCacheTlsTest {
runner.enableControllerService(client);
}
- @AfterClass
- public static void afterClass() {
+ @AfterAll
+ public static void shutdown() {
runner.disableControllerService(client);
runner.removeControllerService(client);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/StandardMapCacheServerTest.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/StandardMapCacheServerTest.java
new file mode 100644
index 0000000000..12b6f0e51e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/StandardMapCacheServerTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.map;
+
+import org.apache.nifi.distributed.cache.operations.MapOperation;
+import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
+import org.apache.nifi.distributed.cache.protocol.ProtocolVersion;
+import org.apache.nifi.distributed.cache.server.EvictionPolicy;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import javax.net.ssl.SSLContext;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.security.SecureRandom;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Timeout(5)
+@ExtendWith(MockitoExtension.class)
+class StandardMapCacheServerTest {
+ private static final String IDENTIFIER = StandardMapCacheServer.class.getSimpleName();
+
+ private static final SSLContext SSL_CONTEXT_DISABLED = null;
+
+ private static final int MAX_CACHE_ENTRIES = 32;
+
+ private static final EvictionPolicy EVICTION_POLICY = EvictionPolicy.FIFO;
+
+ private static final File PERSISTENCE_PATH_DISABLED = null;
+
+ private static final int MAX_READ_LENGTH = 4096;
+
+ private static final String LOCALHOST = "127.0.0.1";
+
+ private static final byte[] HEADER = new byte[]{'N', 'i', 'F', 'i'};
+
+ private static final byte[] KEY = String.class.getSimpleName().getBytes(StandardCharsets.UTF_8);
+
+ private static final int KEY_NOT_FOUND = 0;
+
+ private static final int PUT_COMPLETED = 1;
+
+ @Mock
+ ComponentLog log;
+
+ StandardMapCacheServer server;
+
+ @BeforeEach
+ void setServer() throws IOException {
+ final int port = NetworkUtils.getAvailableTcpPort();
+ server = new StandardMapCacheServer(
+ log,
+ IDENTIFIER,
+ SSL_CONTEXT_DISABLED,
+ port,
+ MAX_CACHE_ENTRIES,
+ EVICTION_POLICY,
+ PERSISTENCE_PATH_DISABLED,
+ MAX_READ_LENGTH
+ );
+ server.start();
+ }
+
+ @AfterEach
+ void stopServer() {
+ server.stop();
+ }
+
+ @Test
+ void testSocketContainsKeyValueDelayed() throws IOException, InterruptedException {
+ try (
+ final Socket socket = new Socket(LOCALHOST, server.getPort());
+ final InputStream inputStream = socket.getInputStream();
+ final OutputStream outputStream = socket.getOutputStream();
+ final DataOutputStream dataOutputStream = new DataOutputStream(outputStream)
+ ) {
+ sendHeaderVersion(dataOutputStream, inputStream);
+
+ dataOutputStream.writeUTF(MapOperation.CONTAINS_KEY.value());
+
+ // Delay writing key to simulate slow network connection
+ TimeUnit.MILLISECONDS.sleep(200);
+
+ dataOutputStream.writeInt(KEY.length);
+ dataOutputStream.write(KEY);
+
+ final int read = inputStream.read();
+ assertEquals(KEY_NOT_FOUND, read);
+ }
+ }
+
+ @Test
+ void testSocketPutGetMaxLength() throws IOException {
+ try (
+ final Socket socket = new Socket(LOCALHOST, server.getPort());
+ final InputStream inputStream = socket.getInputStream();
+ final DataInputStream dataInputStream = new DataInputStream(inputStream);
+ final OutputStream outputStream = socket.getOutputStream();
+ final DataOutputStream dataOutputStream = new DataOutputStream(outputStream)
+ ) {
+ sendHeaderVersion(dataOutputStream, inputStream);
+
+ dataOutputStream.writeUTF(MapOperation.PUT.value());
+
+ dataOutputStream.writeInt(KEY.length);
+ dataOutputStream.write(KEY);
+
+ final byte[] value = getValue();
+ dataOutputStream.writeInt(value.length);
+ dataOutputStream.write(value);
+
+ final int putStatus = inputStream.read();
+ assertEquals(PUT_COMPLETED, putStatus);
+
+ dataOutputStream.writeUTF(MapOperation.GET.value());
+
+ dataOutputStream.writeInt(KEY.length);
+ dataOutputStream.write(KEY);
+
+ final int valueLength = dataInputStream.readInt();
+ assertEquals(MAX_READ_LENGTH, valueLength);
+
+ final byte[] cachedValue = new byte[valueLength];
+ final int cachedValueLength = dataInputStream.read(cachedValue);
+ assertEquals(MAX_READ_LENGTH, cachedValueLength);
+
+ assertArrayEquals(value, cachedValue);
+ }
+ }
+
+ private void sendHeaderVersion(final DataOutputStream dataOutputStream, final InputStream inputStream) throws IOException {
+ dataOutputStream.write(HEADER);
+ dataOutputStream.writeInt(ProtocolVersion.V3.value());
+
+ final int protocolResponse = inputStream.read();
+ assertEquals(ProtocolHandshake.RESOURCE_OK, protocolResponse);
+ }
+
+ private byte[] getValue() {
+ final SecureRandom secureRandom = new SecureRandom();
+ final byte[] value = new byte[MAX_READ_LENGTH];
+ secureRandom.nextBytes(value);
+ return value;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java
index dc2d0ab3ac..d32946e83c 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java
@@ -25,6 +25,8 @@ import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
+import org.apache.nifi.distributed.cache.protocol.ProtocolVersion;
+import org.apache.nifi.distributed.cache.server.CacheServer;
import org.apache.nifi.distributed.cache.server.DistributedCacheServer;
import org.apache.nifi.distributed.cache.server.EvictionPolicy;
import org.apache.nifi.processor.DataUnit;
@@ -224,11 +226,11 @@ public class TestDistributedMapServerAndClient {
// Create a server that only supports protocol version 1.
server = new DistributedMapCacheServer() {
@Override
- protected MapCacheServer createMapCacheServer(int port, int maxSize, SSLContext sslContext, EvictionPolicy evictionPolicy, File persistenceDir, int maxReadSize) throws IOException {
- return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir, maxReadSize) {
+ protected CacheServer createMapCacheServer(int port, int maxSize, SSLContext sslContext, EvictionPolicy evictionPolicy, File persistenceDir, int maxReadSize) throws IOException {
+ return new StandardMapCacheServer(getLogger(), getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir, maxReadSize) {
@Override
- protected StandardVersionNegotiator getVersionNegotiator() {
- return new StandardVersionNegotiator(1);
+ protected StandardVersionNegotiator createVersionNegotiator() {
+ return new StandardVersionNegotiator(ProtocolVersion.V1.value());
}
};
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/DistributedSetCacheTest.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/DistributedSetCacheTest.java
index a5b6342dd1..17612a301e 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/DistributedSetCacheTest.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/DistributedSetCacheTest.java
@@ -24,25 +24,18 @@ import org.apache.nifi.processor.Processor;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
-/**
- * Verify basic functionality of {@link DistributedSetCacheClientService}.
- * <p>
- * This test instantiates both the server and client {@link org.apache.nifi.controller.ControllerService} objects
- * implementing the distributed cache protocol. It assumes that the default distributed cache port (4557)
- * is available.
- */
public class DistributedSetCacheTest {
private static TestRunner runner = null;
@@ -50,8 +43,8 @@ public class DistributedSetCacheTest {
private static DistributedSetCacheClientService client = null;
private static final Serializer<String> serializer = new StringSerializer();
- @BeforeClass
- public static void beforeClass() throws Exception {
+ @BeforeAll
+ public static void setRunner() throws Exception {
final String port = Integer.toString(NetworkUtils.getAvailableTcpPort());
runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
@@ -67,8 +60,8 @@ public class DistributedSetCacheTest {
runner.enableControllerService(client);
}
- @AfterClass
- public static void afterClass() {
+ @AfterAll
+ public static void shutdown() {
runner.disableControllerService(client);
runner.removeControllerService(client);