You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/08 21:29:57 UTC
[34/51] [partial] incubator-nifi git commit: Initial code contribution
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
new file mode 100644
index 0000000..1d7c94c
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.annotation.OnConfigured;
+import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
+import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
+import org.apache.nifi.io.ByteArrayOutputStream;
+import org.apache.nifi.io.DataOutputStream;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.SSLContextService.ClientAuth;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DistributedSetCacheClientService extends AbstractControllerService implements DistributedSetCacheClient {
+
+ private static final Logger logger = LoggerFactory.getLogger(DistributedMapCacheClientService.class);
+
+ public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+ .name("Server Hostname")
+ .description("The name of the server that is running the DistributedSetCacheServer service")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+ .name("Server Port")
+ .description("The port on the remote server that is to be used when communicating with the DistributedSetCacheServer service")
+ .required(true)
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+ .defaultValue("4557")
+ .build();
+ public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+ .name("SSL Context Service")
+ .description(
+ "If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted")
+ .required(false)
+ .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class))
+ .defaultValue(null)
+ .build();
+ public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("Communications Timeout")
+ .description(
+ "Specifices how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("30 secs")
+ .build();
+
+ private final BlockingQueue<CommsSession> queue = new LinkedBlockingQueue<>();
+ private volatile ConfigurationContext configContext;
+ private volatile boolean closed = false;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(HOSTNAME);
+ descriptors.add(PORT);
+ descriptors.add(SSL_CONTEXT_SERVICE);
+ descriptors.add(COMMUNICATIONS_TIMEOUT);
+ return descriptors;
+ }
+
+ @OnConfigured
+ public void onConfigured(final ConfigurationContext context) {
+ this.configContext = context;
+ }
+
+ public CommsSession createCommsSession(final ConfigurationContext context) throws IOException {
+ final String hostname = context.getProperty(HOSTNAME).getValue();
+ final int port = context.getProperty(PORT).asInteger();
+ final long timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+ final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+
+ final CommsSession commsSession;
+ if (sslContextService == null) {
+ commsSession = new StandardCommsSession(hostname, port);
+ } else {
+ commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port);
+ }
+
+ commsSession.setTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
+ return commsSession;
+ }
+
+ private CommsSession leaseCommsSession() throws IOException {
+ CommsSession session = queue.poll();
+ if (session != null && !session.isClosed()) {
+ return session;
+ }
+
+ session = createCommsSession(configContext);
+ final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
+ try {
+ ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator);
+ } catch (final HandshakeException e) {
+ try {
+ session.close();
+ } catch (final IOException ioe) {
+ }
+
+ throw new IOException(e);
+ }
+
+ return session;
+ }
+
+ @Override
+ public <T> boolean addIfAbsent(final T value, final Serializer<T> serializer) throws IOException {
+ return invokeRemoteBoolean("addIfAbsent", value, serializer);
+ }
+
+ @Override
+ public <T> boolean contains(final T value, final Serializer<T> serializer) throws IOException {
+ return invokeRemoteBoolean("contains", value, serializer);
+ }
+
+ @Override
+ public <T> boolean remove(final T value, final Serializer<T> serializer) throws IOException {
+ return invokeRemoteBoolean("remove", value, serializer);
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.closed = true;
+
+ CommsSession commsSession;
+ while ((commsSession = queue.poll()) != null) {
+ try (final DataOutputStream dos = new DataOutputStream(commsSession.getOutputStream())) {
+ dos.writeUTF("close");
+ dos.flush();
+ commsSession.close();
+ } catch (final IOException e) {
+ }
+ }
+ logger.info("Closed {}", new Object[] { getIdentifier() });
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ if (!closed)
+ close();
+ logger.debug("Finalize called");
+ }
+
+ private <T> boolean invokeRemoteBoolean(final String methodName, final T value, final Serializer<T> serializer) throws IOException {
+ if (closed) {
+ throw new IllegalStateException("Client is closed");
+ }
+
+ final CommsSession session = leaseCommsSession();
+ try {
+ final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+ dos.writeUTF(methodName);
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ serializer.serialize(value, baos);
+ dos.writeInt(baos.size());
+ baos.writeTo(dos);
+ dos.flush();
+
+ final DataInputStream dis = new DataInputStream(session.getInputStream());
+ return dis.readBoolean();
+ } catch (final IOException ioe) {
+ try {
+ session.close();
+ } catch (final IOException ignored) {
+ }
+
+ throw ioe;
+ } finally {
+ if (!session.isClosed()) {
+ if (this.closed) {
+ try {
+ session.close();
+ } catch (final IOException ioe) {
+ }
+ } else {
+ queue.offer(session);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
new file mode 100644
index 0000000..c8be082
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.io.BufferedInputStream;
+import org.apache.nifi.io.BufferedOutputStream;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream;
+
+public class SSLCommsSession implements CommsSession {
+ private final SSLSocketChannel sslSocketChannel;
+ private final SSLContext sslContext;
+ private final String hostname;
+ private final int port;
+
+ private final SSLSocketChannelInputStream in;
+ private final BufferedInputStream bufferedIn;
+
+ private final SSLSocketChannelOutputStream out;
+ private final BufferedOutputStream bufferedOut;
+
+ public SSLCommsSession(final SSLContext sslContext, final String hostname, final int port) throws IOException {
+ sslSocketChannel = new SSLSocketChannel(sslContext, hostname, port, true);
+
+ in = new SSLSocketChannelInputStream(sslSocketChannel);
+ bufferedIn = new BufferedInputStream(in);
+
+ out = new SSLSocketChannelOutputStream(sslSocketChannel);
+ bufferedOut = new BufferedOutputStream(out);
+
+ this.sslContext = sslContext;
+ this.hostname = hostname;
+ this.port = port;
+ }
+
+ @Override
+ public void interrupt() {
+ sslSocketChannel.interrupt();
+ }
+
+ @Override
+ public void close() throws IOException {
+ sslSocketChannel.close();
+ }
+
+ @Override
+ public void setTimeout(final long value, final TimeUnit timeUnit) {
+ sslSocketChannel.setTimeout((int) TimeUnit.MILLISECONDS.convert(value, timeUnit));
+ }
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return bufferedIn;
+ }
+
+ @Override
+ public OutputStream getOutputStream() throws IOException {
+ return bufferedOut;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return sslSocketChannel.isClosed();
+ }
+
+ @Override
+ public String getHostname() {
+ return hostname;
+ }
+
+ @Override
+ public int getPort() {
+ return port;
+ }
+ @Override
+ public SSLContext getSSLContext() {
+ return sslContext;
+ }
+ @Override
+ public long getTimeout(final TimeUnit timeUnit) {
+ return timeUnit.convert(sslSocketChannel.getTimeout(), TimeUnit.MILLISECONDS);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
new file mode 100644
index 0000000..bbe2917
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.io.BufferedInputStream;
+import org.apache.nifi.io.BufferedOutputStream;
+import org.apache.nifi.remote.io.InterruptableInputStream;
+import org.apache.nifi.remote.io.InterruptableOutputStream;
+import org.apache.nifi.remote.io.socket.SocketChannelInputStream;
+import org.apache.nifi.remote.io.socket.SocketChannelOutputStream;
+
+public class StandardCommsSession implements CommsSession {
+ private final SocketChannel socketChannel;
+ private final String hostname;
+ private final int port;
+ private volatile long timeoutMillis;
+
+ private final SocketChannelInputStream in;
+ private final InterruptableInputStream bufferedIn;
+
+ private final SocketChannelOutputStream out;
+ private final InterruptableOutputStream bufferedOut;
+
+ public StandardCommsSession(final String hostname, final int port) throws IOException {
+ socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port));
+ socketChannel.configureBlocking(false);
+ in = new SocketChannelInputStream(socketChannel);
+ bufferedIn = new InterruptableInputStream(new BufferedInputStream(in));
+
+ out = new SocketChannelOutputStream(socketChannel);
+ bufferedOut = new InterruptableOutputStream(new BufferedOutputStream(out));
+
+ this.hostname = hostname;
+ this.port = port;
+ }
+
+ @Override
+ public void interrupt() {
+ bufferedIn.interrupt();
+ bufferedOut.interrupt();
+ }
+
+ @Override
+ public void close() throws IOException {
+ socketChannel.close();
+ }
+
+ @Override
+ public void setTimeout(final long value, final TimeUnit timeUnit) {
+ in.setTimeout((int) TimeUnit.MILLISECONDS.convert(value, timeUnit));
+ out.setTimeout((int) TimeUnit.MILLISECONDS.convert(value, timeUnit));
+ timeoutMillis = TimeUnit.MILLISECONDS.convert(value, timeUnit);
+ }
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return bufferedIn;
+ }
+
+ @Override
+ public OutputStream getOutputStream() throws IOException {
+ return bufferedOut;
+ }
+
+ @Override
+ public boolean isClosed() {
+ boolean closed = !socketChannel.isConnected();
+ if (!closed) {
+ try {
+ this.in.isDataAvailable();
+ } catch (IOException e) {
+ try {
+ close();
+ } catch (IOException e1) {
+ }
+ closed = true;
+ }
+ }
+ return closed;
+ }
+
+ @Override
+ public String getHostname() {
+ return hostname;
+ }
+
+ @Override
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public SSLContext getSSLContext() {
+ return null;
+ }
+
+ @Override
+ public long getTimeout(final TimeUnit timeUnit) {
+ return timeUnit.convert(timeoutMillis, TimeUnit.MILLISECONDS);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000..a91f7ee
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService
+org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html
new file mode 100644
index 0000000..d5f3595
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html
@@ -0,0 +1,78 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<head>
+<meta charset="utf-8" />
+<title>Distributed Map Cache Client Service</title>
+<link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+ <h2>Description:</h2>
+
+ <p>A Controller Service that can be used to communicate with a
+ Distributed Map Cache Server.</p>
+
+
+
+ <p>
+ <strong>Properties:</strong>
+ </p>
+ <p>In the list below, the names of required properties appear
+ in bold. Any other properties (not in bold) are considered optional.
+ If a property has a default value, it is indicated. If a property
+ supports the use of the NiFi Expression Language (or simply,
+ "expression language"), that is also indicated.</p>
+
+ <ul>
+ <li><strong>Server Hostname</strong>
+ <ul>
+ <li>The name of the server that is running the DistributedMapCacheServer service</li>
+ <li>Default value: no default</li>
+ <li>Supports expression language: false</li>
+ </ul></li>
+ <li><strong>Server Port</strong>
+ <ul>
+ <li>The port on the remote server that is to be used when communicating with the
+ <a href="../nifi.distributed.cache.server.map.DistributedMapCacheServer/index.html">DistributedMapCacheServer</a> service</li>
+
+ <li>Default value: 4557</li>
+ <li>Supports expression language: false</li>
+ </ul></li>
+ <li>SSL Context Service
+ <ul>
+ <li>If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted
+ <li>Default value: no default</li>
+ <li>Supports expression language: false</li>
+ </ul></li>
+ <li><strong>Communications Timeout</strong>
+ <ul>
+ <li>Specifices how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received
+ <li>Default value: 30 secs</li>
+ <li>Supports expression language: false</li>
+ </ul></li>
+
+ </ul>
+
+
+ <i>See Also:</i>
+ <ul>
+ <li><a href="../org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer/index.html">Distributed Map Cache Server</a></li>
+ <li><a href="../org.apache.nifi.ssl.StandardSSLContextService/index.html">Standard SSL Context Service</a></li>
+ </ul>
+
+</body>
+</html>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService/index.html
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService/index.html b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService/index.html
new file mode 100755
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml b/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml
new file mode 100644
index 0000000..bc612ae
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml
@@ -0,0 +1,39 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>distributed-cache-services-bundle</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>distributed-cache-protocol</artifactId>
+ <name>Distributed Cache Protocol</name>
+
+ <description>
+ Defines the communications protocol that is used between clients and servers
+ for the Distributed Cache services
+ </description>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>remote-communications-utils</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java
new file mode 100644
index 0000000..da2acad
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.protocol;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
+import org.apache.nifi.remote.VersionNegotiator;
+
+public class ProtocolHandshake {
+
+ public static final byte[] MAGIC_HEADER = new byte[] { 'N', 'i', 'F', 'i' };
+
+ public static final int RESOURCE_OK = 20;
+ public static final int DIFFERENT_RESOURCE_VERSION = 21;
+ public static final int ABORT = 255;
+
+
+ public static void initiateHandshake(final InputStream in, final OutputStream out, final VersionNegotiator versionNegotiator) throws IOException, HandshakeException {
+ final DataInputStream dis = new DataInputStream(in);
+ final DataOutputStream dos = new DataOutputStream(out);
+
+ try {
+ dos.write(MAGIC_HEADER);
+
+ initiateVersionNegotiation(versionNegotiator, dis, dos);
+ } finally {
+ dos.flush();
+ }
+ }
+
+
+ public static void receiveHandshake(final InputStream in, final OutputStream out, final VersionNegotiator versionNegotiator) throws IOException, HandshakeException {
+ final DataInputStream dis = new DataInputStream(in);
+ final DataOutputStream dos = new DataOutputStream(out);
+
+ try {
+ final byte[] magicHeaderBuffer = new byte[4];
+ dis.readFully(magicHeaderBuffer);
+
+ receiveVersionNegotiation(versionNegotiator, dis, dos);
+ } finally {
+ dos.flush();
+ }
+ }
+
+
+ private static void initiateVersionNegotiation(final VersionNegotiator negotiator, final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException {
+ // Write the classname of the RemoteStreamCodec, followed by its version
+ dos.writeInt(negotiator.getVersion());
+ dos.flush();
+
+ // wait for response from server.
+ final int statusCode = dis.read();
+ switch (statusCode) {
+ case RESOURCE_OK: // server accepted our proposal of codec name/version
+ return;
+ case DIFFERENT_RESOURCE_VERSION: // server accepted our proposal of codec name but not the version
+ // Get server's preferred version
+ final int newVersion = dis.readInt();
+
+ // Determine our new preferred version that is no greater than the server's preferred version.
+ final Integer newPreference = negotiator.getPreferredVersion(newVersion);
+ // If we could not agree with server on a version, fail now.
+ if ( newPreference == null ) {
+ throw new HandshakeException("Could not agree on protocol version");
+ }
+
+ negotiator.setVersion(newPreference);
+
+ // Attempt negotiation of resource based on our new preferred version.
+ initiateVersionNegotiation(negotiator, dis, dos);
+ case ABORT:
+ throw new HandshakeException("Remote destination aborted connection with message: " + dis.readUTF());
+ default:
+ throw new HandshakeException("Received unexpected response code " + statusCode + " when negotiating version with remote server");
+ }
+ }
+
+ private static void receiveVersionNegotiation(final VersionNegotiator negotiator, final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException {
+ final int version = dis.readInt();
+ if ( negotiator.isVersionSupported(version) ) {
+ dos.write(RESOURCE_OK);
+ dos.flush();
+
+ negotiator.setVersion(version);
+ } else {
+ final Integer preferred = negotiator.getPreferredVersion(version);
+ if ( preferred == null ) {
+ dos.write(ABORT);
+ dos.flush();
+ throw new HandshakeException("Unable to negotiate an acceptable version of the Distributed Cache Protocol");
+ }
+ dos.write(DIFFERENT_RESOURCE_VERSION);
+ dos.writeInt(preferred);
+ dos.flush();
+
+ receiveVersionNegotiation(negotiator, dis, dos);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java
new file mode 100644
index 0000000..8049d42
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.protocol.exception;
+
+public class HandshakeException extends Exception {
+ public HandshakeException(final String message) {
+ super(message);
+ }
+
+ public HandshakeException(final Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/pom.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/pom.xml b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/pom.xml
new file mode 100644
index 0000000..5dec322
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/pom.xml
@@ -0,0 +1,81 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>distributed-cache-services-bundle</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>distributed-cache-server</artifactId>
+
+ <name>Distributed Cache Server</name>
+ <description>Provides a Controller Service for hosting Distributed Caches</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>distributed-cache-protocol</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>remote-communications-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-processor-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-stream-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>ssl-context-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>wali</groupId>
+ <artifactId>wali</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>distributed-cache-client-service-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>distributed-cache-client-service</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>ssl-context-service</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
new file mode 100644
index 0000000..9b4e70e
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
+import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
+import org.apache.nifi.io.BufferedInputStream;
+import org.apache.nifi.io.BufferedOutputStream;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.remote.io.socket.SocketChannelInputStream;
+import org.apache.nifi.remote.io.socket.SocketChannelOutputStream;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractCacheServer implements CacheServer {
+
+ private static final Logger logger = LoggerFactory.getLogger(AbstractCacheServer.class);
+
+ private final String identifier;
+ private final int port;
+ private final SSLContext sslContext;
+ protected volatile boolean stopped = false;
+ private final Set<Thread> processInputThreads = new CopyOnWriteArraySet<>();;
+
+ private volatile ServerSocketChannel serverSocketChannel;
+
+ public AbstractCacheServer(final String identifier, final SSLContext sslContext, final int port) {
+ this.identifier = identifier;
+ this.port = port;
+ this.sslContext = sslContext;
+ }
+
+ @Override
+ public void start() throws IOException {
+ serverSocketChannel = ServerSocketChannel.open();
+ serverSocketChannel.configureBlocking(true);
+ serverSocketChannel.bind(new InetSocketAddress(port));
+
+ final Runnable runnable = new Runnable() {
+
+ @Override
+ public void run() {
+ while (true) {
+ final SocketChannel socketChannel;
+ try {
+ socketChannel = serverSocketChannel.accept();
+ logger.debug("Connected to {}", new Object[] { socketChannel });
+ } catch (final IOException e) {
+ if (!stopped) {
+ logger.error("{} unable to accept connection from remote peer due to {}", this, e.toString());
+ if (logger.isDebugEnabled()) {
+ logger.error("", e);
+ }
+ }
+ return;
+ }
+
+ final Runnable processInputRunnable = new Runnable() {
+ @Override
+ public void run() {
+ final InputStream rawInputStream;
+ final OutputStream rawOutputStream;
+ final String peer = socketChannel.socket().getInetAddress().getHostName();
+
+ try {
+ if (sslContext == null) {
+ rawInputStream = new SocketChannelInputStream(socketChannel);
+ rawOutputStream = new SocketChannelOutputStream(socketChannel);
+ } else {
+ final SSLSocketChannel sslSocketChannel = new SSLSocketChannel(sslContext, socketChannel, false);
+ sslSocketChannel.connect();
+ rawInputStream = new SSLSocketChannelInputStream(sslSocketChannel);
+ rawOutputStream = new SSLSocketChannelOutputStream(sslSocketChannel);
+ }
+ } catch (IOException e) {
+ logger.error("Cannot create input and/or output streams for {}", new Object[] { identifier }, e);
+ if (logger.isDebugEnabled()) {
+ logger.error("", e);
+ }
+ try {
+ socketChannel.close();
+ } catch (IOException swallow) {
+ }
+
+ return;
+ }
+ try (final InputStream in = new BufferedInputStream(rawInputStream);
+ final OutputStream out = new BufferedOutputStream(rawOutputStream)) {
+
+ final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
+
+ ProtocolHandshake.receiveHandshake(in, out, versionNegotiator);
+
+ boolean continueComms = true;
+ while (continueComms) {
+ continueComms = listen(in, out, versionNegotiator.getVersion());
+ }
+ // client has issued 'close'
+ logger.debug("Client issued close on {}", new Object[] { socketChannel });
+ } catch (final SocketTimeoutException e) {
+ logger.debug("30 sec timeout reached", e);
+ } catch (final IOException | HandshakeException e) {
+ if (!stopped) {
+ logger.error("{} unable to communicate with remote peer {} due to {}", new Object[] { this, peer, e.toString() });
+ if (logger.isDebugEnabled()) {
+ logger.error("", e);
+ }
+ }
+ } finally {
+ processInputThreads.remove(Thread.currentThread());
+ }
+ }
+ };
+
+ final Thread processInputThread = new Thread(processInputRunnable);
+ processInputThread.setName("Distributed Cache Server Communications Thread: " + identifier);
+ processInputThread.setDaemon(true);
+ processInputThread.start();
+ processInputThreads.add(processInputThread);
+ }
+ }
+ };
+
+ final Thread thread = new Thread(runnable);
+ thread.setDaemon(true);
+ thread.setName("Distributed Cache Server: " + identifier);
+ thread.start();
+ }
+
+ @Override
+ public void stop() throws IOException {
+ stopped = true;
+ logger.info("Stopping CacheServer {}", new Object[] { this.identifier });
+
+ if (serverSocketChannel != null) {
+ serverSocketChannel.close();
+ }
+ // need to close out the created SocketChannels...this is done by interrupting
+ // the created threads that loop on listen().
+ for (Thread processInputThread : processInputThreads) {
+ processInputThread.interrupt();
+ int i = 0;
+ while (!processInputThread.isInterrupted() && i++ < 5) {
+ try {
+ Thread.sleep(50); // allow thread to gracefully terminate
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ processInputThreads.clear();
+ }
+
+ @Override
+ public String toString() {
+ return "CacheServer[id=" + identifier + "]";
+ }
+
+ /**
+ * Listens for incoming data and communicates with remote peer
+ *
+ * @param in
+ * @param out
+ * @param version
+ * @return <code>true</code> if communications should continue, <code>false</code> otherwise
+ * @throws IOException
+ */
+ protected abstract boolean listen(InputStream in, OutputStream out, int version) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java
new file mode 100644
index 0000000..71ac56d
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class CacheRecord {
+
+ private static final AtomicLong idGenerator = new AtomicLong(0L);
+
+ private final long id;
+ private final long entryDate;
+ private volatile long lastHitDate;
+ private final AtomicInteger hitCount = new AtomicInteger(0);
+
+ public CacheRecord() {
+ entryDate = System.currentTimeMillis();
+ lastHitDate = entryDate;
+ id = idGenerator.getAndIncrement();
+ }
+
+ public long getEntryDate() {
+ return entryDate;
+ }
+
+ public long getLastHitDate() {
+ return lastHitDate;
+ }
+
+ public int getHitCount() {
+ return hitCount.get();
+ }
+
+ public void hit() {
+ hitCount.getAndIncrement();
+ lastHitDate = System.currentTimeMillis();
+ }
+
+ public long getId() {
+ return id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java
new file mode 100644
index 0000000..2c85cd8
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server;
+
+import java.io.IOException;
+
+public interface CacheServer {
+
+ void start() throws IOException;
+ void stop() throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
new file mode 100644
index 0000000..0f962d0
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.annotation.OnConfigured;
+import org.apache.nifi.processor.annotation.OnShutdown;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
+
+public abstract class DistributedCacheServer extends AbstractControllerService {
+ public static final String EVICTION_STRATEGY_LFU = "Least Frequently Used";
+ public static final String EVICTION_STRATEGY_LRU = "Least Recently Used";
+ public static final String EVICTION_STRATEGY_FIFO = "First In, First Out";
+
+ public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+ .name("Port")
+ .description("The port to listen on for incoming connections")
+ .required(true)
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+ .defaultValue("4557")
+ .build();
+ public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+ .name("SSL Context Service")
+ .description(
+ "If specified, this service will be used to create an SSL Context that will be used to secure communications; if not specified, communications will not be secure")
+ .required(false)
+ .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class))
+ .build();
+ public static final PropertyDescriptor MAX_CACHE_ENTRIES = new PropertyDescriptor.Builder()
+ .name("Maximum Cache Entries")
+ .description("The maximum number of cache entries that the cache can hold")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("10000")
+ .build();
+ public static final PropertyDescriptor EVICTION_POLICY = new PropertyDescriptor.Builder()
+ .name("Eviction Strategy")
+ .description("Determines which strategy should be used to evict values from the cache to make room for new entries")
+ .required(true)
+ .allowableValues(EVICTION_STRATEGY_LFU, EVICTION_STRATEGY_LRU, EVICTION_STRATEGY_FIFO)
+ .defaultValue(EVICTION_STRATEGY_LFU)
+ .build();
+ public static final PropertyDescriptor PERSISTENCE_PATH = new PropertyDescriptor.Builder()
+ .name("Persistence Directory")
+ .description("If specified, the cache will be persisted in the given directory; if not specified, the cache will be in-memory only")
+ .required(false)
+ .addValidator(StandardValidators.createDirectoryExistsValidator(true, true))
+ .build();
+
+ private volatile CacheServer cacheServer;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(PORT);
+ properties.add(MAX_CACHE_ENTRIES);
+ properties.add(EVICTION_POLICY);
+ properties.add(PERSISTENCE_PATH);
+ properties.add(new PropertyDescriptor.Builder().fromPropertyDescriptor(SSL_CONTEXT_SERVICE).allowableValues(
+ getControllerServiceLookup().getControllerServiceIdentifiers(SSLContextService.class)).build());
+ return properties;
+ }
+
+ @OnConfigured
+ public void startServer(final ConfigurationContext context) throws IOException {
+ if (cacheServer == null) {
+ cacheServer = createCacheServer(context);
+ cacheServer.start();
+ }
+ }
+
+ @OnShutdown
+ public void shutdownServer() throws IOException {
+ if (cacheServer != null) {
+ cacheServer.stop();
+ }
+ cacheServer = null;
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ shutdownServer();
+ }
+
+ protected abstract CacheServer createCacheServer(ConfigurationContext context);
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java
new file mode 100644
index 0000000..426573f
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server;
+
+import java.io.File;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.SSLContextService.ClientAuth;
+
+public class DistributedSetCacheServer extends DistributedCacheServer {
+
+ @Override
+ protected CacheServer createCacheServer(final ConfigurationContext context) {
+ final int port = context.getProperty(PORT).asInteger();
+ final String persistencePath = context.getProperty(PERSISTENCE_PATH).getValue();
+ final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ final int maxSize = context.getProperty(MAX_CACHE_ENTRIES).asInteger();
+ final String evictionPolicyName = context.getProperty(EVICTION_POLICY).getValue();
+
+ final SSLContext sslContext;
+ if ( sslContextService == null ) {
+ sslContext = null;
+ } else {
+ sslContext = sslContextService.createSSLContext(ClientAuth.REQUIRED);
+ }
+
+ final EvictionPolicy evictionPolicy;
+ switch (evictionPolicyName) {
+ case EVICTION_STRATEGY_FIFO:
+ evictionPolicy = EvictionPolicy.FIFO;
+ break;
+ case EVICTION_STRATEGY_LFU:
+ evictionPolicy = EvictionPolicy.LFU;
+ break;
+ case EVICTION_STRATEGY_LRU:
+ evictionPolicy = EvictionPolicy.LRU;
+ break;
+ default:
+ throw new IllegalArgumentException("Illegal Eviction Policy: " + evictionPolicyName);
+ }
+
+ try {
+ final File persistenceDir = persistencePath == null ? null : new File(persistencePath);
+
+ return new SetCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir);
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java
new file mode 100644
index 0000000..60bd2c1
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server;
+
+import java.util.Comparator;
+
+public enum EvictionPolicy {
+ LFU(new LFUComparator()),
+ LRU(new LRUComparator()),
+ FIFO(new FIFOComparator());
+
+ private final Comparator<CacheRecord> comparator;
+
+ private EvictionPolicy(final Comparator<CacheRecord> comparator) {
+ this.comparator = comparator;
+ }
+
+ public Comparator<CacheRecord> getComparator() {
+ return comparator;
+ }
+
+ public static class LFUComparator implements Comparator<CacheRecord> {
+ @Override
+ public int compare(final CacheRecord o1, final CacheRecord o2) {
+ if ( o1.equals(o2) ) {
+ return 0;
+ }
+
+ final int hitCountComparison = Integer.compare(o1.getHitCount(), o2.getHitCount());
+ final int entryDateComparison = (hitCountComparison == 0) ? Long.compare(o1.getEntryDate(), o2.getEntryDate()) : hitCountComparison;
+ return (entryDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : entryDateComparison);
+ }
+ }
+
+ public static class LRUComparator implements Comparator<CacheRecord> {
+ @Override
+ public int compare(final CacheRecord o1, final CacheRecord o2) {
+ if ( o1.equals(o2) ) {
+ return 0;
+ }
+
+ final int lastHitDateComparison = Long.compare(o1.getLastHitDate(), o2.getLastHitDate());
+ return (lastHitDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : lastHitDateComparison);
+ }
+ }
+
+ public static class FIFOComparator implements Comparator<CacheRecord> {
+ @Override
+ public int compare(final CacheRecord o1, final CacheRecord o2) {
+ if ( o1.equals(o2) ) {
+ return 0;
+ }
+
+ final int entryDateComparison = Long.compare(o1.getEntryDate(), o2.getEntryDate());
+ return (entryDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : entryDateComparison);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
new file mode 100644
index 0000000..5d2c0f6
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.distributed.cache.server.set.PersistentSetCache;
+import org.apache.nifi.distributed.cache.server.set.SetCache;
+import org.apache.nifi.distributed.cache.server.set.SetCacheResult;
+import org.apache.nifi.distributed.cache.server.set.SimpleSetCache;
+import org.apache.nifi.io.DataOutputStream;
+
+public class SetCacheServer extends AbstractCacheServer {
+
+ private final SetCache cache;
+
+ public SetCacheServer(final String identifier, final SSLContext sslContext, final int port, final int maxSize,
+ final EvictionPolicy evictionPolicy, final File persistencePath) throws IOException {
+ super(identifier, sslContext, port);
+
+ final SetCache simpleCache = new SimpleSetCache(identifier, maxSize, evictionPolicy);
+
+ if (persistencePath == null) {
+ this.cache = simpleCache;
+ } else {
+ final PersistentSetCache persistentCache = new PersistentSetCache(identifier, persistencePath, simpleCache);
+ persistentCache.restore();
+ this.cache = persistentCache;
+ }
+ }
+
+ @Override
+ protected boolean listen(final InputStream in, final OutputStream out, final int version) throws IOException {
+ final DataInputStream dis = new DataInputStream(in);
+ final DataOutputStream dos = new DataOutputStream(out);
+
+ final String action = dis.readUTF();
+ if (action.equals("close")) {
+ return false;
+ }
+
+ final int valueLength = dis.readInt();
+ final byte[] value = new byte[valueLength];
+ dis.readFully(value);
+ final ByteBuffer valueBuffer = ByteBuffer.wrap(value);
+
+ final SetCacheResult response;
+ switch (action) {
+ case "addIfAbsent":
+ response = cache.addIfAbsent(valueBuffer);
+ break;
+ case "contains":
+ response = cache.contains(valueBuffer);
+ break;
+ case "remove":
+ response = cache.remove(valueBuffer);
+ break;
+ default:
+ throw new IOException("IllegalRequest");
+ }
+
+ dos.writeBoolean(response.getResult());
+ dos.flush();
+
+ return true;
+ }
+
+ @Override
+ public void stop() throws IOException {
+ try {
+ super.stop();
+ } finally {
+ cache.shutdown();
+ }
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ if (!stopped)
+ stop();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
new file mode 100644
index 0000000..920529d
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.map;
+
+import java.io.File;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.distributed.cache.server.CacheServer;
+import org.apache.nifi.distributed.cache.server.DistributedCacheServer;
+import org.apache.nifi.distributed.cache.server.EvictionPolicy;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.SSLContextService.ClientAuth;
+
+public class DistributedMapCacheServer extends DistributedCacheServer {
+
+ @Override
+ protected CacheServer createCacheServer(final ConfigurationContext context) {
+ final int port = context.getProperty(PORT).asInteger();
+ final String persistencePath = context.getProperty(PERSISTENCE_PATH).getValue();
+ final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ final int maxSize = context.getProperty(MAX_CACHE_ENTRIES).asInteger();
+ final String evictionPolicyName = context.getProperty(EVICTION_POLICY).getValue();
+
+ final SSLContext sslContext;
+ if ( sslContextService == null ) {
+ sslContext = null;
+ } else {
+ sslContext = sslContextService.createSSLContext(ClientAuth.REQUIRED);
+ }
+
+ final EvictionPolicy evictionPolicy;
+ switch (evictionPolicyName) {
+ case EVICTION_STRATEGY_FIFO:
+ evictionPolicy = EvictionPolicy.FIFO;
+ break;
+ case EVICTION_STRATEGY_LFU:
+ evictionPolicy = EvictionPolicy.LFU;
+ break;
+ case EVICTION_STRATEGY_LRU:
+ evictionPolicy = EvictionPolicy.LRU;
+ break;
+ default:
+ throw new IllegalArgumentException("Illegal Eviction Policy: " + evictionPolicyName);
+ }
+
+ try {
+ final File persistenceDir = persistencePath == null ? null : new File(persistencePath);
+
+ return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir);
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
new file mode 100644
index 0000000..534cb0b
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.map;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public interface MapCache {
+
+ MapPutResult putIfAbsent(ByteBuffer key, ByteBuffer value) throws IOException;
+ boolean containsKey(ByteBuffer key) throws IOException;
+ ByteBuffer get(ByteBuffer key) throws IOException;
+ ByteBuffer remove(ByteBuffer key) throws IOException;
+ void shutdown() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java
new file mode 100644
index 0000000..b0ab0c4
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.map;
+
+import java.nio.ByteBuffer;
+
+import org.apache.nifi.distributed.cache.server.CacheRecord;
+
+public class MapCacheRecord extends CacheRecord {
+ private final ByteBuffer key;
+ private final ByteBuffer value;
+
+ public MapCacheRecord(final ByteBuffer key, final ByteBuffer value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public ByteBuffer getKey() {
+ return key;
+ }
+
+ public ByteBuffer getValue() {
+ return value;
+ }
+
+ @Override
+ public int hashCode() {
+ return 2938476 + key.hashCode() * value.hashCode();
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if ( obj == this ) {
+ return true;
+ }
+
+ if ( obj instanceof MapCacheRecord ) {
+ final MapCacheRecord that = ((MapCacheRecord) obj);
+ return key.equals(that.key) && value.equals(that.value);
+ }
+
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
new file mode 100644
index 0000000..3e8dd0e
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.map;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.distributed.cache.server.AbstractCacheServer;
+import org.apache.nifi.distributed.cache.server.EvictionPolicy;
+import org.apache.nifi.io.DataOutputStream;
+
+public class MapCacheServer extends AbstractCacheServer {
+
+ private final MapCache cache;
+
+ public MapCacheServer(final String identifier, final SSLContext sslContext, final int port, final int maxSize,
+ final EvictionPolicy evictionPolicy, final File persistencePath) throws IOException {
+ super(identifier, sslContext, port);
+
+ final MapCache simpleCache = new SimpleMapCache(identifier, maxSize, evictionPolicy);
+
+ if (persistencePath == null) {
+ this.cache = simpleCache;
+ } else {
+ final PersistentMapCache persistentCache = new PersistentMapCache(identifier, persistencePath, simpleCache);
+ persistentCache.restore();
+ this.cache = persistentCache;
+ }
+ }
+
+ @Override
+ protected boolean listen(final InputStream in, final OutputStream out, final int version) throws IOException {
+ final DataInputStream dis = new DataInputStream(in);
+ final DataOutputStream dos = new DataOutputStream(out);
+ final String action = dis.readUTF();
+ try {
+ switch (action) {
+ case "close": {
+ return false;
+ }
+ case "putIfAbsent": {
+ final byte[] key = readValue(dis);
+ final byte[] value = readValue(dis);
+ final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
+ dos.writeBoolean(putResult.isSuccessful());
+ break;
+ }
+ case "containsKey": {
+ final byte[] key = readValue(dis);
+ final boolean contains = cache.containsKey(ByteBuffer.wrap(key));
+ dos.writeBoolean(contains);
+ break;
+ }
+ case "getAndPutIfAbsent": {
+ final byte[] key = readValue(dis);
+ final byte[] value = readValue(dis);
+
+ final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
+ if (putResult.isSuccessful()) {
+ // Put was successful. There was no old value to get.
+ dos.writeInt(0);
+ } else {
+ // we didn't put. Write back the previous value
+ final byte[] byteArray = putResult.getExistingValue().array();
+ dos.writeInt(byteArray.length);
+ dos.write(byteArray);
+ }
+
+ break;
+ }
+ case "get": {
+ final byte[] key = readValue(dis);
+ final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key));
+ if (existingValue == null) {
+ // there was no existing value; we did a "put".
+ dos.writeInt(0);
+ } else {
+ // a value already existed. we did not update the map
+ final byte[] byteArray = existingValue.array();
+ dos.writeInt(byteArray.length);
+ dos.write(byteArray);
+ }
+
+ break;
+ }
+ case "remove": {
+ final byte[] key = readValue(dis);
+ final boolean removed = cache.remove(ByteBuffer.wrap(key)) != null;
+ dos.writeBoolean(removed);
+ break;
+ }
+ default: {
+ throw new IOException("Illegal Request");
+ }
+ }
+ } finally {
+ dos.flush();
+ }
+
+ return true;
+ }
+
+ @Override
+ public void stop() throws IOException {
+ try {
+ super.stop();
+ } finally {
+ cache.shutdown();
+ }
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ if (!stopped)
+ stop();
+ }
+
+ private byte[] readValue(final DataInputStream dis) throws IOException {
+ final int numBytes = dis.readInt();
+ final byte[] buffer = new byte[numBytes];
+ dis.readFully(buffer);
+ return buffer;
+ }
+
+}