You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/15 11:14:50 UTC
[05/12] incubator-nifi git commit: NIFI-169 well it finally all
builds. There is a classpath issue still to sort out which impacts startup
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java b/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
new file mode 100644
index 0000000..5bead8c
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import java.io.IOException;
+
+import org.apache.nifi.controller.ControllerService;
+
+/**
+ * This interface defines an API that can be used for interacting with a
+ * Distributed Cache that functions similarly to a {@link java.util.Map Map}.
+ *
+ */
+public interface DistributedMapCacheClient extends ControllerService {
+
+ /**
+ * Adds the specified key and value to the cache, if they are not already
+ * present, serializing the key and value with the given
+ * {@link Serializer}s.
+ *
+ * @param <K>
+ * @param <V>
+ * @param key the key for into the map
+ * @param value the value to add to the map if and only if the key is absent
+ * @param keySerializer
+ * @param valueSerializer
+ * @return true if the value was added to the cache, false if the value
+ * already existed in the cache
+ *
+ * @throws IOException if unable to communicate with the remote instance
+ */
+ <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException;
+
+ /**
+ * Adds the specified key and value to the cache, if they are not already
+ * present, serializing the key and value with the given
+ * {@link Serializer}s. If a value already exists in the cache for the given
+ * key, the value associated with the key is returned, after being
+ * deserialized with the given valueDeserializer.
+ *
+ * @param <K>
+ * @param <V>
+ * @param key
+ * @param value
+ * @param keySerializer
+ * @param valueSerializer
+ * @param valueDeserializer
+ * @return
+ * @throws IOException
+ */
+ <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException;
+
+ /**
+ * Determines if the given value is present in the cache and if so returns
+ * <code>true</code>, else returns <code>false</code>
+ *
+ * @param <K>
+ * @param key
+ * @param keySerializer
+ * @return
+ *
+ * @throws IOException if unable to communicate with the remote instance
+ */
+ <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException;
+
+ /**
+ * Returns the value in the cache for the given key, if one exists;
+ * otherwise returns <code>null</code>
+ *
+ * @param <K>
+ * @param <V>
+ * @param key the key to lookup in the map
+ * @param keySerializer
+ * @param valueDeserializer
+ *
+ * @return
+ * @throws IOException
+ */
+ <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException;
+
+ /**
+ * Attempts to notify the server that we are finished communicating with it
+ * and cleans up resources
+ * @throws java.io.IOException
+ */
+ void close() throws IOException;
+
+ /**
+ * Removes the entry with the given key from the cache, if it is present.
+ *
+ * @param <K>
+ * @param key
+ * @param serializer
+ * @return <code>true</code> if the entry is removed, <code>false</code> if
+ * the key did not exist in the cache
+ * @throws IOException
+ */
+ <K> boolean remove(K key, Serializer<K> serializer) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClient.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClient.java b/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClient.java
new file mode 100644
index 0000000..12aae3e
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClient.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import java.io.IOException;
+
+import org.apache.nifi.controller.ControllerService;
+
+/**
+ * This interface defines an API that can be used for interacting with a
+ * Distributed Cache that functions similarly to a {@link java.util.Set Set}.
+ */
+public interface DistributedSetCacheClient extends ControllerService {
+
+ /**
+ * Adds the specified value to the cache, serializing the value with the
+ * given {@link Serializer}.
+ *
+ * @param <T>
+ * @param value
+ * @param serializer
+ * @return true if the value was added to the cache, false if the value
+ * already existed in the cache
+ *
+ * @throws IOException if unable to communicate with the remote instance
+ */
+ <T> boolean addIfAbsent(T value, Serializer<T> serializer) throws IOException;
+
+ /**
+ * Returns if the given value is present in the cache and if so returns
+ * <code>true</code>, else returns <code>false</code>
+ *
+ * @param <T>
+ * @param value
+ * @param serializer
+ * @return
+ *
+ * @throws IOException if unable to communicate with the remote instance
+ */
+ <T> boolean contains(T value, Serializer<T> serializer) throws IOException;
+
+ /**
+ * Removes the given value from the cache, if it is present.
+ *
+ * @param <T>
+ * @param value
+ * @param serializer
+ * @return <code>true</code> if the value is removed, <code>false</code> if
+ * the value did not exist in the cache
+ * @throws IOException
+ */
+ <T> boolean remove(T value, Serializer<T> serializer) throws IOException;
+
+ /**
+ * Attempts to notify the server that we are finished communicating with it
+ * and cleans up resources
+ * @throws java.io.IOException
+ */
+ void close() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/Serializer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/Serializer.java b/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/Serializer.java
new file mode 100644
index 0000000..f1896be
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/Serializer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.nifi.distributed.cache.client.exception.SerializationException;
+
+/**
+ * Provides a mechanism by which a value can be serialized to a stream of bytes
+ * @param <T>
+ */
+public interface Serializer<T> {
+
+ /**
+ * Serializes the given value to the {@link OutputStream}
+ *
+ * @param value
+ * @param output
+ * @throws SerializationException If unable to serialize the given value
+ * @throws java.io.IOException
+ */
+ void serialize(T value, OutputStream output) throws SerializationException, IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/exception/DeserializationException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/exception/DeserializationException.java b/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/exception/DeserializationException.java
new file mode 100644
index 0000000..bb2fcb2
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/exception/DeserializationException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client.exception;
+
+public class DeserializationException extends RuntimeException {
+
+ public DeserializationException(final Throwable cause) {
+ super(cause);
+ }
+
+ public DeserializationException(final String message) {
+ super(message);
+ }
+
+ public DeserializationException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/exception/SerializationException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/exception/SerializationException.java b/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/exception/SerializationException.java
new file mode 100644
index 0000000..aac59f5
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/exception/SerializationException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client.exception;
+
+public class SerializationException extends RuntimeException {
+
+ public SerializationException(final Throwable cause) {
+ super(cause);
+ }
+
+ public SerializationException(final String message) {
+ super(message);
+ }
+
+ public SerializationException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml
new file mode 100644
index 0000000..a251393
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml
@@ -0,0 +1,60 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>distributed-cache-services-bundle</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>distributed-cache-client-service</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Distributed Cache Client Service</name>
+ <description>Provides a Client for interfacing with a Distributed Cache</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>distributed-cache-client-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>distributed-cache-protocol</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>remote-communications-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-processor-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-stream-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>ssl-context-service-api</artifactId>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java
new file mode 100644
index 0000000..f838c2f
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+
+public interface CommsSession extends Closeable {
+
+ void setTimeout(final long value, final TimeUnit timeUnit);
+
+ InputStream getInputStream() throws IOException;
+
+ OutputStream getOutputStream() throws IOException;
+
+ boolean isClosed();
+
+ void interrupt();
+
+ String getHostname();
+
+ int getPort();
+
+ long getTimeout(TimeUnit timeUnit);
+
+ SSLContext getSSLContext();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
new file mode 100644
index 0000000..ee96660
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.annotation.OnConfigured;
+import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
+import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
+import org.apache.nifi.io.ByteArrayOutputStream;
+import org.apache.nifi.io.DataOutputStream;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.SSLContextService.ClientAuth;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DistributedMapCacheClientService extends AbstractControllerService implements DistributedMapCacheClient {
+
+ private static final Logger logger = LoggerFactory.getLogger(DistributedMapCacheClientService.class);
+
+ public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+ .name("Server Hostname")
+ .description("The name of the server that is running the DistributedMapCacheServer service")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+ .name("Server Port")
+ .description("The port on the remote server that is to be used when communicating with the DistributedMapCacheServer service")
+ .required(true)
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+ .defaultValue("4557")
+ .build();
+ public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+ .name("SSL Context Service")
+ .description(
+ "If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted")
+ .required(false)
+ .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class))
+ .defaultValue(null)
+ .build();
+ public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("Communications Timeout")
+ .description(
+ "Specifies how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("30 secs")
+ .build();
+
+ private final BlockingQueue<CommsSession> queue = new LinkedBlockingQueue<>();
+ private volatile ConfigurationContext configContext;
+ private volatile boolean closed = false;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(HOSTNAME);
+ descriptors.add(PORT);
+ descriptors.add(SSL_CONTEXT_SERVICE);
+ descriptors.add(COMMUNICATIONS_TIMEOUT);
+ return descriptors;
+ }
+
+ @OnConfigured
+ public void cacheConfig(final ConfigurationContext context) {
+ this.configContext = context;
+ }
+
+ @Override
+ public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer)
+ throws IOException {
+ return withCommsSession(new CommsAction<Boolean>() {
+ @Override
+ public Boolean execute(final CommsSession session) throws IOException {
+ final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+ dos.writeUTF("putIfAbsent");
+
+ serialize(key, keySerializer, dos);
+ serialize(value, valueSerializer, dos);
+
+ dos.flush();
+
+ final DataInputStream dis = new DataInputStream(session.getInputStream());
+ return dis.readBoolean();
+ }
+ });
+ }
+
+ @Override
+ public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
+ return withCommsSession(new CommsAction<Boolean>() {
+ @Override
+ public Boolean execute(final CommsSession session) throws IOException {
+ final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+ dos.writeUTF("containsKey");
+
+ serialize(key, keySerializer, dos);
+ dos.flush();
+
+ final DataInputStream dis = new DataInputStream(session.getInputStream());
+ return dis.readBoolean();
+ }
+ });
+ }
+
+ @Override
+ public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
+ final Deserializer<V> valueDeserializer) throws IOException {
+ return withCommsSession(new CommsAction<V>() {
+ @Override
+ public V execute(final CommsSession session) throws IOException {
+ final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+ dos.writeUTF("getAndPutIfAbsent");
+
+ serialize(key, keySerializer, dos);
+ serialize(value, valueSerializer, dos);
+ dos.flush();
+
+ // read response
+ final DataInputStream dis = new DataInputStream(session.getInputStream());
+ final byte[] responseBuffer = readLengthDelimitedResponse(dis);
+ return valueDeserializer.deserialize(responseBuffer);
+ }
+ });
+ }
+
+ @Override
+ public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
+ return withCommsSession(new CommsAction<V>() {
+ @Override
+ public V execute(final CommsSession session) throws IOException {
+ final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+ dos.writeUTF("get");
+
+ serialize(key, keySerializer, dos);
+ dos.flush();
+
+ // read response
+ final DataInputStream dis = new DataInputStream(session.getInputStream());
+ final byte[] responseBuffer = readLengthDelimitedResponse(dis);
+ return valueDeserializer.deserialize(responseBuffer);
+ }
+ });
+ }
+
+ @Override
+ public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException {
+ return withCommsSession(new CommsAction<Boolean>() {
+ @Override
+ public Boolean execute(final CommsSession session) throws IOException {
+ final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+ dos.writeUTF("remove");
+
+ serialize(key, serializer, dos);
+ dos.flush();
+
+ // read response
+ final DataInputStream dis = new DataInputStream(session.getInputStream());
+ return dis.readBoolean();
+ }
+ });
+ }
+
+ private byte[] readLengthDelimitedResponse(final DataInputStream dis) throws IOException {
+ final int responseLength = dis.readInt();
+ final byte[] responseBuffer = new byte[responseLength];
+ dis.readFully(responseBuffer);
+ return responseBuffer;
+ }
+
+ public CommsSession createCommsSession(final ConfigurationContext context) throws IOException {
+ final String hostname = context.getProperty(HOSTNAME).getValue();
+ final int port = context.getProperty(PORT).asInteger();
+ final long timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+ final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+
+ final CommsSession commsSession;
+ if (sslContextService == null) {
+ commsSession = new StandardCommsSession(hostname, port);
+ } else {
+ commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port);
+ }
+
+ commsSession.setTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
+ return commsSession;
+ }
+
+ private CommsSession leaseCommsSession() throws IOException {
+ CommsSession session = queue.poll();
+ if (session != null && !session.isClosed()) {
+ return session;
+ }
+
+ session = createCommsSession(configContext);
+ final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
+ try {
+ ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator);
+ } catch (final HandshakeException e) {
+ try {
+ session.close();
+ } catch (final IOException ioe) {
+ }
+
+ throw new IOException(e);
+ }
+
+ return session;
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.closed = true;
+
+ CommsSession commsSession;
+ while ((commsSession = queue.poll()) != null) {
+ try (final DataOutputStream dos = new DataOutputStream(commsSession.getOutputStream())) {
+ dos.writeUTF("close");
+ dos.flush();
+ commsSession.close();
+ } catch (final IOException e) {
+ }
+ }
+ logger.info("Closed {}", new Object[] { getIdentifier() });
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ if (!closed)
+ close();
+ logger.debug("Finalize called");
+ }
+
+ private <T> void serialize(final T value, final Serializer<T> serializer, final DataOutputStream dos) throws IOException {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ serializer.serialize(value, baos);
+ dos.writeInt(baos.size());
+ baos.writeTo(dos);
+ }
+
+ private <T> T withCommsSession(final CommsAction<T> action) throws IOException {
+ if (closed) {
+ throw new IllegalStateException("Client is closed");
+ }
+
+ final CommsSession session = leaseCommsSession();
+ try {
+ return action.execute(session);
+ } catch (final IOException ioe) {
+ try {
+ session.close();
+ } catch (final IOException ignored) {
+ }
+
+ throw ioe;
+ } finally {
+ if (!session.isClosed()) {
+ if (this.closed) {
+ try {
+ session.close();
+ } catch (final IOException ioe) {
+ }
+ } else {
+ queue.offer(session);
+ }
+ }
+ }
+ }
+
+ private static interface CommsAction<T> {
+ T execute(CommsSession commsSession) throws IOException;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
new file mode 100644
index 0000000..1d7c94c
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.annotation.OnConfigured;
+import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
+import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
+import org.apache.nifi.io.ByteArrayOutputStream;
+import org.apache.nifi.io.DataOutputStream;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.SSLContextService.ClientAuth;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DistributedSetCacheClientService extends AbstractControllerService implements DistributedSetCacheClient {
+
+ private static final Logger logger = LoggerFactory.getLogger(DistributedMapCacheClientService.class);
+
+ public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+ .name("Server Hostname")
+ .description("The name of the server that is running the DistributedSetCacheServer service")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+ .name("Server Port")
+ .description("The port on the remote server that is to be used when communicating with the DistributedSetCacheServer service")
+ .required(true)
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+ .defaultValue("4557")
+ .build();
+ public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+ .name("SSL Context Service")
+ .description(
+ "If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted")
+ .required(false)
+ .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class))
+ .defaultValue(null)
+ .build();
+ public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("Communications Timeout")
+ .description(
+ "Specifices how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("30 secs")
+ .build();
+
+ private final BlockingQueue<CommsSession> queue = new LinkedBlockingQueue<>();
+ private volatile ConfigurationContext configContext;
+ private volatile boolean closed = false;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(HOSTNAME);
+ descriptors.add(PORT);
+ descriptors.add(SSL_CONTEXT_SERVICE);
+ descriptors.add(COMMUNICATIONS_TIMEOUT);
+ return descriptors;
+ }
+
+ @OnConfigured
+ public void onConfigured(final ConfigurationContext context) {
+ this.configContext = context;
+ }
+
+ public CommsSession createCommsSession(final ConfigurationContext context) throws IOException {
+ final String hostname = context.getProperty(HOSTNAME).getValue();
+ final int port = context.getProperty(PORT).asInteger();
+ final long timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+ final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+
+ final CommsSession commsSession;
+ if (sslContextService == null) {
+ commsSession = new StandardCommsSession(hostname, port);
+ } else {
+ commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port);
+ }
+
+ commsSession.setTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
+ return commsSession;
+ }
+
+ private CommsSession leaseCommsSession() throws IOException {
+ CommsSession session = queue.poll();
+ if (session != null && !session.isClosed()) {
+ return session;
+ }
+
+ session = createCommsSession(configContext);
+ final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
+ try {
+ ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator);
+ } catch (final HandshakeException e) {
+ try {
+ session.close();
+ } catch (final IOException ioe) {
+ }
+
+ throw new IOException(e);
+ }
+
+ return session;
+ }
+
+ @Override
+ public <T> boolean addIfAbsent(final T value, final Serializer<T> serializer) throws IOException {
+ return invokeRemoteBoolean("addIfAbsent", value, serializer);
+ }
+
+ @Override
+ public <T> boolean contains(final T value, final Serializer<T> serializer) throws IOException {
+ return invokeRemoteBoolean("contains", value, serializer);
+ }
+
+ @Override
+ public <T> boolean remove(final T value, final Serializer<T> serializer) throws IOException {
+ return invokeRemoteBoolean("remove", value, serializer);
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.closed = true;
+
+ CommsSession commsSession;
+ while ((commsSession = queue.poll()) != null) {
+ try (final DataOutputStream dos = new DataOutputStream(commsSession.getOutputStream())) {
+ dos.writeUTF("close");
+ dos.flush();
+ commsSession.close();
+ } catch (final IOException e) {
+ }
+ }
+ logger.info("Closed {}", new Object[] { getIdentifier() });
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ if (!closed)
+ close();
+ logger.debug("Finalize called");
+ }
+
+ private <T> boolean invokeRemoteBoolean(final String methodName, final T value, final Serializer<T> serializer) throws IOException {
+ if (closed) {
+ throw new IllegalStateException("Client is closed");
+ }
+
+ final CommsSession session = leaseCommsSession();
+ try {
+ final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+ dos.writeUTF(methodName);
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ serializer.serialize(value, baos);
+ dos.writeInt(baos.size());
+ baos.writeTo(dos);
+ dos.flush();
+
+ final DataInputStream dis = new DataInputStream(session.getInputStream());
+ return dis.readBoolean();
+ } catch (final IOException ioe) {
+ try {
+ session.close();
+ } catch (final IOException ignored) {
+ }
+
+ throw ioe;
+ } finally {
+ if (!session.isClosed()) {
+ if (this.closed) {
+ try {
+ session.close();
+ } catch (final IOException ioe) {
+ }
+ } else {
+ queue.offer(session);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
new file mode 100644
index 0000000..c8be082
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.io.BufferedInputStream;
+import org.apache.nifi.io.BufferedOutputStream;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream;
+
+public class SSLCommsSession implements CommsSession {
+ private final SSLSocketChannel sslSocketChannel;
+ private final SSLContext sslContext;
+ private final String hostname;
+ private final int port;
+
+ private final SSLSocketChannelInputStream in;
+ private final BufferedInputStream bufferedIn;
+
+ private final SSLSocketChannelOutputStream out;
+ private final BufferedOutputStream bufferedOut;
+
+ public SSLCommsSession(final SSLContext sslContext, final String hostname, final int port) throws IOException {
+ sslSocketChannel = new SSLSocketChannel(sslContext, hostname, port, true);
+
+ in = new SSLSocketChannelInputStream(sslSocketChannel);
+ bufferedIn = new BufferedInputStream(in);
+
+ out = new SSLSocketChannelOutputStream(sslSocketChannel);
+ bufferedOut = new BufferedOutputStream(out);
+
+ this.sslContext = sslContext;
+ this.hostname = hostname;
+ this.port = port;
+ }
+
+ @Override
+ public void interrupt() {
+ sslSocketChannel.interrupt();
+ }
+
+ @Override
+ public void close() throws IOException {
+ sslSocketChannel.close();
+ }
+
+ @Override
+ public void setTimeout(final long value, final TimeUnit timeUnit) {
+ sslSocketChannel.setTimeout((int) TimeUnit.MILLISECONDS.convert(value, timeUnit));
+ }
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return bufferedIn;
+ }
+
+ @Override
+ public OutputStream getOutputStream() throws IOException {
+ return bufferedOut;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return sslSocketChannel.isClosed();
+ }
+
+ @Override
+ public String getHostname() {
+ return hostname;
+ }
+
+ @Override
+ public int getPort() {
+ return port;
+ }
+ @Override
+ public SSLContext getSSLContext() {
+ return sslContext;
+ }
+ @Override
+ public long getTimeout(final TimeUnit timeUnit) {
+ return timeUnit.convert(sslSocketChannel.getTimeout(), TimeUnit.MILLISECONDS);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
new file mode 100644
index 0000000..bbe2917
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.io.BufferedInputStream;
+import org.apache.nifi.io.BufferedOutputStream;
+import org.apache.nifi.remote.io.InterruptableInputStream;
+import org.apache.nifi.remote.io.InterruptableOutputStream;
+import org.apache.nifi.remote.io.socket.SocketChannelInputStream;
+import org.apache.nifi.remote.io.socket.SocketChannelOutputStream;
+
+public class StandardCommsSession implements CommsSession {
+ private final SocketChannel socketChannel;
+ private final String hostname;
+ private final int port;
+ private volatile long timeoutMillis;
+
+ private final SocketChannelInputStream in;
+ private final InterruptableInputStream bufferedIn;
+
+ private final SocketChannelOutputStream out;
+ private final InterruptableOutputStream bufferedOut;
+
+ public StandardCommsSession(final String hostname, final int port) throws IOException {
+ socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port));
+ socketChannel.configureBlocking(false);
+ in = new SocketChannelInputStream(socketChannel);
+ bufferedIn = new InterruptableInputStream(new BufferedInputStream(in));
+
+ out = new SocketChannelOutputStream(socketChannel);
+ bufferedOut = new InterruptableOutputStream(new BufferedOutputStream(out));
+
+ this.hostname = hostname;
+ this.port = port;
+ }
+
+ @Override
+ public void interrupt() {
+ bufferedIn.interrupt();
+ bufferedOut.interrupt();
+ }
+
+ @Override
+ public void close() throws IOException {
+ socketChannel.close();
+ }
+
+ @Override
+ public void setTimeout(final long value, final TimeUnit timeUnit) {
+ in.setTimeout((int) TimeUnit.MILLISECONDS.convert(value, timeUnit));
+ out.setTimeout((int) TimeUnit.MILLISECONDS.convert(value, timeUnit));
+ timeoutMillis = TimeUnit.MILLISECONDS.convert(value, timeUnit);
+ }
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return bufferedIn;
+ }
+
+ @Override
+ public OutputStream getOutputStream() throws IOException {
+ return bufferedOut;
+ }
+
+ @Override
+ public boolean isClosed() {
+ boolean closed = !socketChannel.isConnected();
+ if (!closed) {
+ try {
+ this.in.isDataAvailable();
+ } catch (IOException e) {
+ try {
+ close();
+ } catch (IOException e1) {
+ }
+ closed = true;
+ }
+ }
+ return closed;
+ }
+
+ @Override
+ public String getHostname() {
+ return hostname;
+ }
+
+ @Override
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public SSLContext getSSLContext() {
+ return null;
+ }
+
+ @Override
+ public long getTimeout(final TimeUnit timeUnit) {
+ return timeUnit.convert(timeoutMillis, TimeUnit.MILLISECONDS);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000..a91f7ee
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService
+org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html
new file mode 100644
index 0000000..d5f3595
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html
@@ -0,0 +1,78 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<head>
+<meta charset="utf-8" />
+<title>Distributed Map Cache Client Service</title>
+<link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+ <h2>Description:</h2>
+
+ <p>A Controller Service that can be used to communicate with a
+ Distributed Map Cache Server.</p>
+
+
+
+ <p>
+ <strong>Properties:</strong>
+ </p>
+ <p>In the list below, the names of required properties appear
+ in bold. Any other properties (not in bold) are considered optional.
+ If a property has a default value, it is indicated. If a property
+ supports the use of the NiFi Expression Language (or simply,
+ "expression language"), that is also indicated.</p>
+
+ <ul>
+ <li><strong>Server Hostname</strong>
+ <ul>
+ <li>The name of the server that is running the DistributedMapCacheServer service</li>
+ <li>Default value: no default</li>
+ <li>Supports expression language: false</li>
+ </ul></li>
+ <li><strong>Server Port</strong>
+ <ul>
+ <li>The port on the remote server that is to be used when communicating with the
+ <a href="../nifi.distributed.cache.server.map.DistributedMapCacheServer/index.html">DistributedMapCacheServer</a> service</li>
+
+ <li>Default value: 4557</li>
+ <li>Supports expression language: false</li>
+ </ul></li>
+ <li>SSL Context Service
+ <ul>
+ <li>If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted
+ <li>Default value: no default</li>
+ <li>Supports expression language: false</li>
+ </ul></li>
+ <li><strong>Communications Timeout</strong>
+ <ul>
+ <li>Specifices how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received
+ <li>Default value: 30 secs</li>
+ <li>Supports expression language: false</li>
+ </ul></li>
+
+ </ul>
+
+
+ <i>See Also:</i>
+ <ul>
+ <li><a href="../org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer/index.html">Distributed Map Cache Server</a></li>
+ <li><a href="../org.apache.nifi.ssl.StandardSSLContextService/index.html">Standard SSL Context Service</a></li>
+ </ul>
+
+</body>
+</html>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService/index.html
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService/index.html b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService/index.html
new file mode 100755
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml
new file mode 100644
index 0000000..f636261
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml
@@ -0,0 +1,39 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>distributed-cache-services-bundle</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>distributed-cache-protocol</artifactId>
+ <name>Distributed Cache Protocol</name>
+
+ <description>
+ Defines the communications protocol that is used between clients and servers
+ for the Distributed Cache services
+ </description>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>remote-communications-utils</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java
new file mode 100644
index 0000000..da2acad
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.protocol;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
+import org.apache.nifi.remote.VersionNegotiator;
+
+public class ProtocolHandshake {
+
+ public static final byte[] MAGIC_HEADER = new byte[] { 'N', 'i', 'F', 'i' };
+
+ public static final int RESOURCE_OK = 20;
+ public static final int DIFFERENT_RESOURCE_VERSION = 21;
+ public static final int ABORT = 255;
+
+
+ public static void initiateHandshake(final InputStream in, final OutputStream out, final VersionNegotiator versionNegotiator) throws IOException, HandshakeException {
+ final DataInputStream dis = new DataInputStream(in);
+ final DataOutputStream dos = new DataOutputStream(out);
+
+ try {
+ dos.write(MAGIC_HEADER);
+
+ initiateVersionNegotiation(versionNegotiator, dis, dos);
+ } finally {
+ dos.flush();
+ }
+ }
+
+
+ public static void receiveHandshake(final InputStream in, final OutputStream out, final VersionNegotiator versionNegotiator) throws IOException, HandshakeException {
+ final DataInputStream dis = new DataInputStream(in);
+ final DataOutputStream dos = new DataOutputStream(out);
+
+ try {
+ final byte[] magicHeaderBuffer = new byte[4];
+ dis.readFully(magicHeaderBuffer);
+
+ receiveVersionNegotiation(versionNegotiator, dis, dos);
+ } finally {
+ dos.flush();
+ }
+ }
+
+
+ private static void initiateVersionNegotiation(final VersionNegotiator negotiator, final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException {
+ // Write the classname of the RemoteStreamCodec, followed by its version
+ dos.writeInt(negotiator.getVersion());
+ dos.flush();
+
+ // wait for response from server.
+ final int statusCode = dis.read();
+ switch (statusCode) {
+ case RESOURCE_OK: // server accepted our proposal of codec name/version
+ return;
+ case DIFFERENT_RESOURCE_VERSION: // server accepted our proposal of codec name but not the version
+ // Get server's preferred version
+ final int newVersion = dis.readInt();
+
+ // Determine our new preferred version that is no greater than the server's preferred version.
+ final Integer newPreference = negotiator.getPreferredVersion(newVersion);
+ // If we could not agree with server on a version, fail now.
+ if ( newPreference == null ) {
+ throw new HandshakeException("Could not agree on protocol version");
+ }
+
+ negotiator.setVersion(newPreference);
+
+ // Attempt negotiation of resource based on our new preferred version.
+ initiateVersionNegotiation(negotiator, dis, dos);
+ case ABORT:
+ throw new HandshakeException("Remote destination aborted connection with message: " + dis.readUTF());
+ default:
+ throw new HandshakeException("Received unexpected response code " + statusCode + " when negotiating version with remote server");
+ }
+ }
+
+ private static void receiveVersionNegotiation(final VersionNegotiator negotiator, final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException {
+ final int version = dis.readInt();
+ if ( negotiator.isVersionSupported(version) ) {
+ dos.write(RESOURCE_OK);
+ dos.flush();
+
+ negotiator.setVersion(version);
+ } else {
+ final Integer preferred = negotiator.getPreferredVersion(version);
+ if ( preferred == null ) {
+ dos.write(ABORT);
+ dos.flush();
+ throw new HandshakeException("Unable to negotiate an acceptable version of the Distributed Cache Protocol");
+ }
+ dos.write(DIFFERENT_RESOURCE_VERSION);
+ dos.writeInt(preferred);
+ dos.flush();
+
+ receiveVersionNegotiation(negotiator, dis, dos);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java
new file mode 100644
index 0000000..8049d42
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.protocol.exception;
+
+public class HandshakeException extends Exception {
+ public HandshakeException(final String message) {
+ super(message);
+ }
+
+ public HandshakeException(final Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/pom.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/pom.xml b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/pom.xml
new file mode 100644
index 0000000..b57d284
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/pom.xml
@@ -0,0 +1,78 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>distributed-cache-services-bundle</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>distributed-cache-server</artifactId>
+
+ <name>Distributed Cache Server</name>
+ <description>Provides a Controller Service for hosting Distributed Caches</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>distributed-cache-protocol</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>remote-communications-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-processor-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-stream-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>ssl-context-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>wali</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>distributed-cache-client-service-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>distributed-cache-client-service</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>ssl-context-service</artifactId>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
new file mode 100644
index 0000000..9b4e70e
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
+import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
+import org.apache.nifi.io.BufferedInputStream;
+import org.apache.nifi.io.BufferedOutputStream;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.remote.io.socket.SocketChannelInputStream;
+import org.apache.nifi.remote.io.socket.SocketChannelOutputStream;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractCacheServer implements CacheServer {
+
+ private static final Logger logger = LoggerFactory.getLogger(AbstractCacheServer.class);
+
+ private final String identifier;
+ private final int port;
+ private final SSLContext sslContext;
+ protected volatile boolean stopped = false;
+ private final Set<Thread> processInputThreads = new CopyOnWriteArraySet<>();;
+
+ private volatile ServerSocketChannel serverSocketChannel;
+
+ public AbstractCacheServer(final String identifier, final SSLContext sslContext, final int port) {
+ this.identifier = identifier;
+ this.port = port;
+ this.sslContext = sslContext;
+ }
+
+ @Override
+ public void start() throws IOException {
+ serverSocketChannel = ServerSocketChannel.open();
+ serverSocketChannel.configureBlocking(true);
+ serverSocketChannel.bind(new InetSocketAddress(port));
+
+ final Runnable runnable = new Runnable() {
+
+ @Override
+ public void run() {
+ while (true) {
+ final SocketChannel socketChannel;
+ try {
+ socketChannel = serverSocketChannel.accept();
+ logger.debug("Connected to {}", new Object[] { socketChannel });
+ } catch (final IOException e) {
+ if (!stopped) {
+ logger.error("{} unable to accept connection from remote peer due to {}", this, e.toString());
+ if (logger.isDebugEnabled()) {
+ logger.error("", e);
+ }
+ }
+ return;
+ }
+
+ final Runnable processInputRunnable = new Runnable() {
+ @Override
+ public void run() {
+ final InputStream rawInputStream;
+ final OutputStream rawOutputStream;
+ final String peer = socketChannel.socket().getInetAddress().getHostName();
+
+ try {
+ if (sslContext == null) {
+ rawInputStream = new SocketChannelInputStream(socketChannel);
+ rawOutputStream = new SocketChannelOutputStream(socketChannel);
+ } else {
+ final SSLSocketChannel sslSocketChannel = new SSLSocketChannel(sslContext, socketChannel, false);
+ sslSocketChannel.connect();
+ rawInputStream = new SSLSocketChannelInputStream(sslSocketChannel);
+ rawOutputStream = new SSLSocketChannelOutputStream(sslSocketChannel);
+ }
+ } catch (IOException e) {
+ logger.error("Cannot create input and/or output streams for {}", new Object[] { identifier }, e);
+ if (logger.isDebugEnabled()) {
+ logger.error("", e);
+ }
+ try {
+ socketChannel.close();
+ } catch (IOException swallow) {
+ }
+
+ return;
+ }
+ try (final InputStream in = new BufferedInputStream(rawInputStream);
+ final OutputStream out = new BufferedOutputStream(rawOutputStream)) {
+
+ final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
+
+ ProtocolHandshake.receiveHandshake(in, out, versionNegotiator);
+
+ boolean continueComms = true;
+ while (continueComms) {
+ continueComms = listen(in, out, versionNegotiator.getVersion());
+ }
+ // client has issued 'close'
+ logger.debug("Client issued close on {}", new Object[] { socketChannel });
+ } catch (final SocketTimeoutException e) {
+ logger.debug("30 sec timeout reached", e);
+ } catch (final IOException | HandshakeException e) {
+ if (!stopped) {
+ logger.error("{} unable to communicate with remote peer {} due to {}", new Object[] { this, peer, e.toString() });
+ if (logger.isDebugEnabled()) {
+ logger.error("", e);
+ }
+ }
+ } finally {
+ processInputThreads.remove(Thread.currentThread());
+ }
+ }
+ };
+
+ final Thread processInputThread = new Thread(processInputRunnable);
+ processInputThread.setName("Distributed Cache Server Communications Thread: " + identifier);
+ processInputThread.setDaemon(true);
+ processInputThread.start();
+ processInputThreads.add(processInputThread);
+ }
+ }
+ };
+
+ final Thread thread = new Thread(runnable);
+ thread.setDaemon(true);
+ thread.setName("Distributed Cache Server: " + identifier);
+ thread.start();
+ }
+
+ @Override
+ public void stop() throws IOException {
+ stopped = true;
+ logger.info("Stopping CacheServer {}", new Object[] { this.identifier });
+
+ if (serverSocketChannel != null) {
+ serverSocketChannel.close();
+ }
+ // need to close out the created SocketChannels...this is done by interrupting
+ // the created threads that loop on listen().
+ for (Thread processInputThread : processInputThreads) {
+ processInputThread.interrupt();
+ int i = 0;
+ while (!processInputThread.isInterrupted() && i++ < 5) {
+ try {
+ Thread.sleep(50); // allow thread to gracefully terminate
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ processInputThreads.clear();
+ }
+
+ @Override
+ public String toString() {
+ return "CacheServer[id=" + identifier + "]";
+ }
+
+ /**
+ * Listens for incoming data and communicates with remote peer
+ *
+ * @param in
+ * @param out
+ * @param version
+ * @return <code>true</code> if communications should continue, <code>false</code> otherwise
+ * @throws IOException
+ */
+ protected abstract boolean listen(InputStream in, OutputStream out, int version) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java
new file mode 100644
index 0000000..71ac56d
--- /dev/null
+++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class CacheRecord {
+
+ private static final AtomicLong idGenerator = new AtomicLong(0L);
+
+ private final long id;
+ private final long entryDate;
+ private volatile long lastHitDate;
+ private final AtomicInteger hitCount = new AtomicInteger(0);
+
+ public CacheRecord() {
+ entryDate = System.currentTimeMillis();
+ lastHitDate = entryDate;
+ id = idGenerator.getAndIncrement();
+ }
+
+ public long getEntryDate() {
+ return entryDate;
+ }
+
+ public long getLastHitDate() {
+ return lastHitDate;
+ }
+
+ public int getHitCount() {
+ return hitCount.get();
+ }
+
+ public void hit() {
+ hitCount.getAndIncrement();
+ lastHitDate = System.currentTimeMillis();
+ }
+
+ public long getId() {
+ return id;
+ }
+}