You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/15 11:14:54 UTC
[09/12] incubator-nifi git commit: NIFI-169 well it finally all
builds. There is a classpath issue still to sort out which impacts startup
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java
deleted file mode 100644
index 8049d42..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.protocol.exception;
-
-public class HandshakeException extends Exception {
- public HandshakeException(final String message) {
- super(message);
- }
-
- public HandshakeException(final Throwable cause) {
- super(cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/pom.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/pom.xml b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/pom.xml
deleted file mode 100644
index 5dec322..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/pom.xml
+++ /dev/null
@@ -1,81 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>distributed-cache-services-bundle</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- </parent>
-
- <artifactId>distributed-cache-server</artifactId>
-
- <name>Distributed Cache Server</name>
- <description>Provides a Controller Service for hosting Distributed Caches</description>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>distributed-cache-protocol</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>remote-communications-utils</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-processor-utils</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-stream-utils</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>ssl-context-service-api</artifactId>
- </dependency>
- <dependency>
- <groupId>wali</groupId>
- <artifactId>wali</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>distributed-cache-client-service-api</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>distributed-cache-client-service</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-mock</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>ssl-context-service</artifactId>
- </dependency>
- </dependencies>
-
-</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
deleted file mode 100644
index 9b4e70e..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-import javax.net.ssl.SSLContext;
-
-import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
-import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
-import org.apache.nifi.io.BufferedInputStream;
-import org.apache.nifi.io.BufferedOutputStream;
-import org.apache.nifi.remote.StandardVersionNegotiator;
-import org.apache.nifi.remote.VersionNegotiator;
-import org.apache.nifi.remote.io.socket.SocketChannelInputStream;
-import org.apache.nifi.remote.io.socket.SocketChannelOutputStream;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class AbstractCacheServer implements CacheServer {
-
- private static final Logger logger = LoggerFactory.getLogger(AbstractCacheServer.class);
-
- private final String identifier;
- private final int port;
- private final SSLContext sslContext;
- protected volatile boolean stopped = false;
- private final Set<Thread> processInputThreads = new CopyOnWriteArraySet<>();;
-
- private volatile ServerSocketChannel serverSocketChannel;
-
- public AbstractCacheServer(final String identifier, final SSLContext sslContext, final int port) {
- this.identifier = identifier;
- this.port = port;
- this.sslContext = sslContext;
- }
-
- @Override
- public void start() throws IOException {
- serverSocketChannel = ServerSocketChannel.open();
- serverSocketChannel.configureBlocking(true);
- serverSocketChannel.bind(new InetSocketAddress(port));
-
- final Runnable runnable = new Runnable() {
-
- @Override
- public void run() {
- while (true) {
- final SocketChannel socketChannel;
- try {
- socketChannel = serverSocketChannel.accept();
- logger.debug("Connected to {}", new Object[] { socketChannel });
- } catch (final IOException e) {
- if (!stopped) {
- logger.error("{} unable to accept connection from remote peer due to {}", this, e.toString());
- if (logger.isDebugEnabled()) {
- logger.error("", e);
- }
- }
- return;
- }
-
- final Runnable processInputRunnable = new Runnable() {
- @Override
- public void run() {
- final InputStream rawInputStream;
- final OutputStream rawOutputStream;
- final String peer = socketChannel.socket().getInetAddress().getHostName();
-
- try {
- if (sslContext == null) {
- rawInputStream = new SocketChannelInputStream(socketChannel);
- rawOutputStream = new SocketChannelOutputStream(socketChannel);
- } else {
- final SSLSocketChannel sslSocketChannel = new SSLSocketChannel(sslContext, socketChannel, false);
- sslSocketChannel.connect();
- rawInputStream = new SSLSocketChannelInputStream(sslSocketChannel);
- rawOutputStream = new SSLSocketChannelOutputStream(sslSocketChannel);
- }
- } catch (IOException e) {
- logger.error("Cannot create input and/or output streams for {}", new Object[] { identifier }, e);
- if (logger.isDebugEnabled()) {
- logger.error("", e);
- }
- try {
- socketChannel.close();
- } catch (IOException swallow) {
- }
-
- return;
- }
- try (final InputStream in = new BufferedInputStream(rawInputStream);
- final OutputStream out = new BufferedOutputStream(rawOutputStream)) {
-
- final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
-
- ProtocolHandshake.receiveHandshake(in, out, versionNegotiator);
-
- boolean continueComms = true;
- while (continueComms) {
- continueComms = listen(in, out, versionNegotiator.getVersion());
- }
- // client has issued 'close'
- logger.debug("Client issued close on {}", new Object[] { socketChannel });
- } catch (final SocketTimeoutException e) {
- logger.debug("30 sec timeout reached", e);
- } catch (final IOException | HandshakeException e) {
- if (!stopped) {
- logger.error("{} unable to communicate with remote peer {} due to {}", new Object[] { this, peer, e.toString() });
- if (logger.isDebugEnabled()) {
- logger.error("", e);
- }
- }
- } finally {
- processInputThreads.remove(Thread.currentThread());
- }
- }
- };
-
- final Thread processInputThread = new Thread(processInputRunnable);
- processInputThread.setName("Distributed Cache Server Communications Thread: " + identifier);
- processInputThread.setDaemon(true);
- processInputThread.start();
- processInputThreads.add(processInputThread);
- }
- }
- };
-
- final Thread thread = new Thread(runnable);
- thread.setDaemon(true);
- thread.setName("Distributed Cache Server: " + identifier);
- thread.start();
- }
-
- @Override
- public void stop() throws IOException {
- stopped = true;
- logger.info("Stopping CacheServer {}", new Object[] { this.identifier });
-
- if (serverSocketChannel != null) {
- serverSocketChannel.close();
- }
- // need to close out the created SocketChannels...this is done by interrupting
- // the created threads that loop on listen().
- for (Thread processInputThread : processInputThreads) {
- processInputThread.interrupt();
- int i = 0;
- while (!processInputThread.isInterrupted() && i++ < 5) {
- try {
- Thread.sleep(50); // allow thread to gracefully terminate
- } catch (InterruptedException e) {
- }
- }
- }
- processInputThreads.clear();
- }
-
- @Override
- public String toString() {
- return "CacheServer[id=" + identifier + "]";
- }
-
- /**
- * Listens for incoming data and communicates with remote peer
- *
- * @param in
- * @param out
- * @param version
- * @return <code>true</code> if communications should continue, <code>false</code> otherwise
- * @throws IOException
- */
- protected abstract boolean listen(InputStream in, OutputStream out, int version) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java
deleted file mode 100644
index 71ac56d..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class CacheRecord {
-
- private static final AtomicLong idGenerator = new AtomicLong(0L);
-
- private final long id;
- private final long entryDate;
- private volatile long lastHitDate;
- private final AtomicInteger hitCount = new AtomicInteger(0);
-
- public CacheRecord() {
- entryDate = System.currentTimeMillis();
- lastHitDate = entryDate;
- id = idGenerator.getAndIncrement();
- }
-
- public long getEntryDate() {
- return entryDate;
- }
-
- public long getLastHitDate() {
- return lastHitDate;
- }
-
- public int getHitCount() {
- return hitCount.get();
- }
-
- public void hit() {
- hitCount.getAndIncrement();
- lastHitDate = System.currentTimeMillis();
- }
-
- public long getId() {
- return id;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java
deleted file mode 100644
index 2c85cd8..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server;
-
-import java.io.IOException;
-
-public interface CacheServer {
-
- void start() throws IOException;
- void stop() throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
deleted file mode 100644
index 0f962d0..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.controller.annotation.OnConfigured;
-import org.apache.nifi.processor.annotation.OnShutdown;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.ssl.SSLContextService;
-
-public abstract class DistributedCacheServer extends AbstractControllerService {
- public static final String EVICTION_STRATEGY_LFU = "Least Frequently Used";
- public static final String EVICTION_STRATEGY_LRU = "Least Recently Used";
- public static final String EVICTION_STRATEGY_FIFO = "First In, First Out";
-
- public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
- .name("Port")
- .description("The port to listen on for incoming connections")
- .required(true)
- .addValidator(StandardValidators.PORT_VALIDATOR)
- .defaultValue("4557")
- .build();
- public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
- .name("SSL Context Service")
- .description(
- "If specified, this service will be used to create an SSL Context that will be used to secure communications; if not specified, communications will not be secure")
- .required(false)
- .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class))
- .build();
- public static final PropertyDescriptor MAX_CACHE_ENTRIES = new PropertyDescriptor.Builder()
- .name("Maximum Cache Entries")
- .description("The maximum number of cache entries that the cache can hold")
- .required(true)
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
- .defaultValue("10000")
- .build();
- public static final PropertyDescriptor EVICTION_POLICY = new PropertyDescriptor.Builder()
- .name("Eviction Strategy")
- .description("Determines which strategy should be used to evict values from the cache to make room for new entries")
- .required(true)
- .allowableValues(EVICTION_STRATEGY_LFU, EVICTION_STRATEGY_LRU, EVICTION_STRATEGY_FIFO)
- .defaultValue(EVICTION_STRATEGY_LFU)
- .build();
- public static final PropertyDescriptor PERSISTENCE_PATH = new PropertyDescriptor.Builder()
- .name("Persistence Directory")
- .description("If specified, the cache will be persisted in the given directory; if not specified, the cache will be in-memory only")
- .required(false)
- .addValidator(StandardValidators.createDirectoryExistsValidator(true, true))
- .build();
-
- private volatile CacheServer cacheServer;
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- final List<PropertyDescriptor> properties = new ArrayList<>();
- properties.add(PORT);
- properties.add(MAX_CACHE_ENTRIES);
- properties.add(EVICTION_POLICY);
- properties.add(PERSISTENCE_PATH);
- properties.add(new PropertyDescriptor.Builder().fromPropertyDescriptor(SSL_CONTEXT_SERVICE).allowableValues(
- getControllerServiceLookup().getControllerServiceIdentifiers(SSLContextService.class)).build());
- return properties;
- }
-
- @OnConfigured
- public void startServer(final ConfigurationContext context) throws IOException {
- if (cacheServer == null) {
- cacheServer = createCacheServer(context);
- cacheServer.start();
- }
- }
-
- @OnShutdown
- public void shutdownServer() throws IOException {
- if (cacheServer != null) {
- cacheServer.stop();
- }
- cacheServer = null;
- }
-
- @Override
- protected void finalize() throws Throwable {
- shutdownServer();
- }
-
- protected abstract CacheServer createCacheServer(ConfigurationContext context);
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java
deleted file mode 100644
index 426573f..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server;
-
-import java.io.File;
-
-import javax.net.ssl.SSLContext;
-
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.ssl.SSLContextService;
-import org.apache.nifi.ssl.SSLContextService.ClientAuth;
-
-public class DistributedSetCacheServer extends DistributedCacheServer {
-
- @Override
- protected CacheServer createCacheServer(final ConfigurationContext context) {
- final int port = context.getProperty(PORT).asInteger();
- final String persistencePath = context.getProperty(PERSISTENCE_PATH).getValue();
- final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
- final int maxSize = context.getProperty(MAX_CACHE_ENTRIES).asInteger();
- final String evictionPolicyName = context.getProperty(EVICTION_POLICY).getValue();
-
- final SSLContext sslContext;
- if ( sslContextService == null ) {
- sslContext = null;
- } else {
- sslContext = sslContextService.createSSLContext(ClientAuth.REQUIRED);
- }
-
- final EvictionPolicy evictionPolicy;
- switch (evictionPolicyName) {
- case EVICTION_STRATEGY_FIFO:
- evictionPolicy = EvictionPolicy.FIFO;
- break;
- case EVICTION_STRATEGY_LFU:
- evictionPolicy = EvictionPolicy.LFU;
- break;
- case EVICTION_STRATEGY_LRU:
- evictionPolicy = EvictionPolicy.LRU;
- break;
- default:
- throw new IllegalArgumentException("Illegal Eviction Policy: " + evictionPolicyName);
- }
-
- try {
- final File persistenceDir = persistencePath == null ? null : new File(persistencePath);
-
- return new SetCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir);
- } catch (final Exception e) {
- throw new RuntimeException(e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java
deleted file mode 100644
index 60bd2c1..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server;
-
-import java.util.Comparator;
-
-public enum EvictionPolicy {
- LFU(new LFUComparator()),
- LRU(new LRUComparator()),
- FIFO(new FIFOComparator());
-
- private final Comparator<CacheRecord> comparator;
-
- private EvictionPolicy(final Comparator<CacheRecord> comparator) {
- this.comparator = comparator;
- }
-
- public Comparator<CacheRecord> getComparator() {
- return comparator;
- }
-
- public static class LFUComparator implements Comparator<CacheRecord> {
- @Override
- public int compare(final CacheRecord o1, final CacheRecord o2) {
- if ( o1.equals(o2) ) {
- return 0;
- }
-
- final int hitCountComparison = Integer.compare(o1.getHitCount(), o2.getHitCount());
- final int entryDateComparison = (hitCountComparison == 0) ? Long.compare(o1.getEntryDate(), o2.getEntryDate()) : hitCountComparison;
- return (entryDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : entryDateComparison);
- }
- }
-
- public static class LRUComparator implements Comparator<CacheRecord> {
- @Override
- public int compare(final CacheRecord o1, final CacheRecord o2) {
- if ( o1.equals(o2) ) {
- return 0;
- }
-
- final int lastHitDateComparison = Long.compare(o1.getLastHitDate(), o2.getLastHitDate());
- return (lastHitDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : lastHitDateComparison);
- }
- }
-
- public static class FIFOComparator implements Comparator<CacheRecord> {
- @Override
- public int compare(final CacheRecord o1, final CacheRecord o2) {
- if ( o1.equals(o2) ) {
- return 0;
- }
-
- final int entryDateComparison = Long.compare(o1.getEntryDate(), o2.getEntryDate());
- return (entryDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : entryDateComparison);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
deleted file mode 100644
index 5d2c0f6..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server;
-
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-import javax.net.ssl.SSLContext;
-
-import org.apache.nifi.distributed.cache.server.set.PersistentSetCache;
-import org.apache.nifi.distributed.cache.server.set.SetCache;
-import org.apache.nifi.distributed.cache.server.set.SetCacheResult;
-import org.apache.nifi.distributed.cache.server.set.SimpleSetCache;
-import org.apache.nifi.io.DataOutputStream;
-
-public class SetCacheServer extends AbstractCacheServer {
-
- private final SetCache cache;
-
- public SetCacheServer(final String identifier, final SSLContext sslContext, final int port, final int maxSize,
- final EvictionPolicy evictionPolicy, final File persistencePath) throws IOException {
- super(identifier, sslContext, port);
-
- final SetCache simpleCache = new SimpleSetCache(identifier, maxSize, evictionPolicy);
-
- if (persistencePath == null) {
- this.cache = simpleCache;
- } else {
- final PersistentSetCache persistentCache = new PersistentSetCache(identifier, persistencePath, simpleCache);
- persistentCache.restore();
- this.cache = persistentCache;
- }
- }
-
- @Override
- protected boolean listen(final InputStream in, final OutputStream out, final int version) throws IOException {
- final DataInputStream dis = new DataInputStream(in);
- final DataOutputStream dos = new DataOutputStream(out);
-
- final String action = dis.readUTF();
- if (action.equals("close")) {
- return false;
- }
-
- final int valueLength = dis.readInt();
- final byte[] value = new byte[valueLength];
- dis.readFully(value);
- final ByteBuffer valueBuffer = ByteBuffer.wrap(value);
-
- final SetCacheResult response;
- switch (action) {
- case "addIfAbsent":
- response = cache.addIfAbsent(valueBuffer);
- break;
- case "contains":
- response = cache.contains(valueBuffer);
- break;
- case "remove":
- response = cache.remove(valueBuffer);
- break;
- default:
- throw new IOException("IllegalRequest");
- }
-
- dos.writeBoolean(response.getResult());
- dos.flush();
-
- return true;
- }
-
- @Override
- public void stop() throws IOException {
- try {
- super.stop();
- } finally {
- cache.shutdown();
- }
- }
-
- @Override
- protected void finalize() throws Throwable {
- if (!stopped)
- stop();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
deleted file mode 100644
index 920529d..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server.map;
-
-import java.io.File;
-
-import javax.net.ssl.SSLContext;
-
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.distributed.cache.server.CacheServer;
-import org.apache.nifi.distributed.cache.server.DistributedCacheServer;
-import org.apache.nifi.distributed.cache.server.EvictionPolicy;
-import org.apache.nifi.ssl.SSLContextService;
-import org.apache.nifi.ssl.SSLContextService.ClientAuth;
-
-public class DistributedMapCacheServer extends DistributedCacheServer {
-
- @Override
- protected CacheServer createCacheServer(final ConfigurationContext context) {
- final int port = context.getProperty(PORT).asInteger();
- final String persistencePath = context.getProperty(PERSISTENCE_PATH).getValue();
- final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
- final int maxSize = context.getProperty(MAX_CACHE_ENTRIES).asInteger();
- final String evictionPolicyName = context.getProperty(EVICTION_POLICY).getValue();
-
- final SSLContext sslContext;
- if ( sslContextService == null ) {
- sslContext = null;
- } else {
- sslContext = sslContextService.createSSLContext(ClientAuth.REQUIRED);
- }
-
- final EvictionPolicy evictionPolicy;
- switch (evictionPolicyName) {
- case EVICTION_STRATEGY_FIFO:
- evictionPolicy = EvictionPolicy.FIFO;
- break;
- case EVICTION_STRATEGY_LFU:
- evictionPolicy = EvictionPolicy.LFU;
- break;
- case EVICTION_STRATEGY_LRU:
- evictionPolicy = EvictionPolicy.LRU;
- break;
- default:
- throw new IllegalArgumentException("Illegal Eviction Policy: " + evictionPolicyName);
- }
-
- try {
- final File persistenceDir = persistencePath == null ? null : new File(persistencePath);
-
- return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir);
- } catch (final Exception e) {
- throw new RuntimeException(e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
deleted file mode 100644
index 534cb0b..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server.map;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-public interface MapCache {
-
- MapPutResult putIfAbsent(ByteBuffer key, ByteBuffer value) throws IOException;
- boolean containsKey(ByteBuffer key) throws IOException;
- ByteBuffer get(ByteBuffer key) throws IOException;
- ByteBuffer remove(ByteBuffer key) throws IOException;
- void shutdown() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java
deleted file mode 100644
index b0ab0c4..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server.map;
-
-import java.nio.ByteBuffer;
-
-import org.apache.nifi.distributed.cache.server.CacheRecord;
-
-public class MapCacheRecord extends CacheRecord {
- private final ByteBuffer key;
- private final ByteBuffer value;
-
- public MapCacheRecord(final ByteBuffer key, final ByteBuffer value) {
- this.key = key;
- this.value = value;
- }
-
- public ByteBuffer getKey() {
- return key;
- }
-
- public ByteBuffer getValue() {
- return value;
- }
-
- @Override
- public int hashCode() {
- return 2938476 + key.hashCode() * value.hashCode();
- }
-
- @Override
- public boolean equals(final Object obj) {
- if ( obj == this ) {
- return true;
- }
-
- if ( obj instanceof MapCacheRecord ) {
- final MapCacheRecord that = ((MapCacheRecord) obj);
- return key.equals(that.key) && value.equals(that.value);
- }
-
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
deleted file mode 100644
index 3e8dd0e..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server.map;
-
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-import javax.net.ssl.SSLContext;
-
-import org.apache.nifi.distributed.cache.server.AbstractCacheServer;
-import org.apache.nifi.distributed.cache.server.EvictionPolicy;
-import org.apache.nifi.io.DataOutputStream;
-
-public class MapCacheServer extends AbstractCacheServer {
-
- private final MapCache cache;
-
- public MapCacheServer(final String identifier, final SSLContext sslContext, final int port, final int maxSize,
- final EvictionPolicy evictionPolicy, final File persistencePath) throws IOException {
- super(identifier, sslContext, port);
-
- final MapCache simpleCache = new SimpleMapCache(identifier, maxSize, evictionPolicy);
-
- if (persistencePath == null) {
- this.cache = simpleCache;
- } else {
- final PersistentMapCache persistentCache = new PersistentMapCache(identifier, persistencePath, simpleCache);
- persistentCache.restore();
- this.cache = persistentCache;
- }
- }
-
- @Override
- protected boolean listen(final InputStream in, final OutputStream out, final int version) throws IOException {
- final DataInputStream dis = new DataInputStream(in);
- final DataOutputStream dos = new DataOutputStream(out);
- final String action = dis.readUTF();
- try {
- switch (action) {
- case "close": {
- return false;
- }
- case "putIfAbsent": {
- final byte[] key = readValue(dis);
- final byte[] value = readValue(dis);
- final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
- dos.writeBoolean(putResult.isSuccessful());
- break;
- }
- case "containsKey": {
- final byte[] key = readValue(dis);
- final boolean contains = cache.containsKey(ByteBuffer.wrap(key));
- dos.writeBoolean(contains);
- break;
- }
- case "getAndPutIfAbsent": {
- final byte[] key = readValue(dis);
- final byte[] value = readValue(dis);
-
- final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
- if (putResult.isSuccessful()) {
- // Put was successful. There was no old value to get.
- dos.writeInt(0);
- } else {
- // we didn't put. Write back the previous value
- final byte[] byteArray = putResult.getExistingValue().array();
- dos.writeInt(byteArray.length);
- dos.write(byteArray);
- }
-
- break;
- }
- case "get": {
- final byte[] key = readValue(dis);
- final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key));
- if (existingValue == null) {
- // there was no existing value; we did a "put".
- dos.writeInt(0);
- } else {
- // a value already existed. we did not update the map
- final byte[] byteArray = existingValue.array();
- dos.writeInt(byteArray.length);
- dos.write(byteArray);
- }
-
- break;
- }
- case "remove": {
- final byte[] key = readValue(dis);
- final boolean removed = cache.remove(ByteBuffer.wrap(key)) != null;
- dos.writeBoolean(removed);
- break;
- }
- default: {
- throw new IOException("Illegal Request");
- }
- }
- } finally {
- dos.flush();
- }
-
- return true;
- }
-
- @Override
- public void stop() throws IOException {
- try {
- super.stop();
- } finally {
- cache.shutdown();
- }
- }
-
- @Override
- protected void finalize() throws Throwable {
- if (!stopped)
- stop();
- }
-
- private byte[] readValue(final DataInputStream dis) throws IOException {
- final int numBytes = dis.readInt();
- final byte[] buffer = new byte[numBytes];
- dis.readFully(buffer);
- return buffer;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java
deleted file mode 100644
index 29695eb..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server.map;
-
-import java.nio.ByteBuffer;
-
-public class MapPutResult {
- private final boolean successful;
- private final ByteBuffer key, value;
- private final ByteBuffer existingValue;
- private final ByteBuffer evictedKey, evictedValue;
-
- public MapPutResult(final boolean successful, final ByteBuffer key, final ByteBuffer value, final ByteBuffer existingValue, final ByteBuffer evictedKey, final ByteBuffer evictedValue) {
- this.successful = successful;
- this.key = key;
- this.value = value;
- this.existingValue = existingValue;
- this.evictedKey = evictedKey;
- this.evictedValue = evictedValue;
- }
-
- public boolean isSuccessful() {
- return successful;
- }
-
- public ByteBuffer getKey() {
- return key;
- }
-
- public ByteBuffer getValue() {
- return value;
- }
-
- public ByteBuffer getExistingValue() {
- return existingValue;
- }
-
- public ByteBuffer getEvictedKey() {
- return evictedKey;
- }
-
- public ByteBuffer getEvictedValue() {
- return evictedValue;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
deleted file mode 100644
index 77fb77d..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server.map;
-
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.wali.MinimalLockingWriteAheadLog;
-import org.wali.SerDe;
-import org.wali.UpdateType;
-import org.wali.WriteAheadRepository;
-
-public class PersistentMapCache implements MapCache {
-
- private final MapCache wrapped;
- private final WriteAheadRepository<MapWaliRecord> wali;
-
- private final AtomicLong modifications = new AtomicLong(0L);
-
- public PersistentMapCache(final String serviceIdentifier, final File persistencePath, final MapCache cacheToWrap) throws IOException {
- wali = new MinimalLockingWriteAheadLog<>(persistencePath.toPath(), 1, new Serde(), null);
- wrapped = cacheToWrap;
- }
-
- synchronized void restore() throws IOException {
- final Collection<MapWaliRecord> recovered = wali.recoverRecords();
- for ( final MapWaliRecord record : recovered ) {
- if ( record.getUpdateType() == UpdateType.CREATE ) {
- wrapped.putIfAbsent(record.getKey(), record.getValue());
- }
- }
- }
-
- @Override
- public MapPutResult putIfAbsent(final ByteBuffer key, final ByteBuffer value) throws IOException {
- final MapPutResult putResult = wrapped.putIfAbsent(key, value);
- if ( putResult.isSuccessful() ) {
- // The put was successful.
- final MapWaliRecord record = new MapWaliRecord(UpdateType.CREATE, key, value);
- final List<MapWaliRecord> records = new ArrayList<>();
- records.add(record);
-
- if ( putResult.getEvictedKey() != null ) {
- records.add(new MapWaliRecord(UpdateType.DELETE, putResult.getEvictedKey(), putResult.getEvictedValue()));
- }
-
- wali.update(Collections.singletonList(record), false);
-
- final long modCount = modifications.getAndIncrement();
- if ( modCount > 0 && modCount % 100000 == 0 ) {
- wali.checkpoint();
- }
- }
-
- return putResult;
- }
-
- @Override
- public boolean containsKey(final ByteBuffer key) throws IOException {
- return wrapped.containsKey(key);
- }
-
- @Override
- public ByteBuffer get(final ByteBuffer key) throws IOException {
- return wrapped.get(key);
- }
-
- @Override
- public ByteBuffer remove(ByteBuffer key) throws IOException {
- final ByteBuffer removeResult = wrapped.remove(key);
- if ( removeResult != null ) {
- final MapWaliRecord record = new MapWaliRecord(UpdateType.DELETE, key, removeResult);
- final List<MapWaliRecord> records = new ArrayList<>(1);
- records.add(record);
- wali.update(records, false);
-
- final long modCount = modifications.getAndIncrement();
- if ( modCount > 0 && modCount % 1000 == 0 ) {
- wali.checkpoint();
- }
- }
- return removeResult;
- }
-
-
- @Override
- public void shutdown() throws IOException {
- wali.shutdown();
- }
-
-
- private static class MapWaliRecord {
- private final UpdateType updateType;
- private final ByteBuffer key;
- private final ByteBuffer value;
-
- public MapWaliRecord(final UpdateType updateType, final ByteBuffer key, final ByteBuffer value) {
- this.updateType = updateType;
- this.key = key;
- this.value = value;
- }
-
- public UpdateType getUpdateType() {
- return updateType;
- }
-
- public ByteBuffer getKey() {
- return key;
- }
-
- public ByteBuffer getValue() {
- return value;
- }
- }
-
- private static class Serde implements SerDe<MapWaliRecord> {
-
- @Override
- public void serializeEdit(MapWaliRecord previousRecordState, MapWaliRecord newRecordState, java.io.DataOutputStream out) throws IOException {
- final UpdateType updateType = newRecordState.getUpdateType();
- if ( updateType == UpdateType.DELETE ) {
- out.write(0);
- } else {
- out.write(1);
- }
-
- final byte[] key = newRecordState.getKey().array();
- final byte[] value = newRecordState.getValue().array();
-
- out.writeInt(key.length);
- out.write(key);
- out.writeInt(value.length);
- out.write(value);
- }
-
- @Override
- public void serializeRecord(MapWaliRecord record, java.io.DataOutputStream out) throws IOException {
- serializeEdit(null, record, out);
- }
-
- @Override
- public MapWaliRecord deserializeEdit(final DataInputStream in, final Map<Object, MapWaliRecord> currentRecordStates, final int version) throws IOException {
- final int updateTypeValue = in.read();
- if ( updateTypeValue < 0 ) {
- throw new EOFException();
- }
-
- final UpdateType updateType = (updateTypeValue == 0 ? UpdateType.DELETE : UpdateType.CREATE);
-
- final int keySize = in.readInt();
- final byte[] key = new byte[keySize];
- in.readFully(key);
-
- final int valueSize = in.readInt();
- final byte[] value = new byte[valueSize];
- in.readFully(value);
-
- return new MapWaliRecord(updateType, ByteBuffer.wrap(key), ByteBuffer.wrap(value));
- }
-
- @Override
- public MapWaliRecord deserializeRecord(DataInputStream in, int version) throws IOException {
- return deserializeEdit(in, new HashMap<Object, MapWaliRecord>(), version);
- }
-
- @Override
- public Object getRecordIdentifier(final MapWaliRecord record) {
- return record.getKey();
- }
-
- @Override
- public UpdateType getUpdateType(final MapWaliRecord record) {
- return record.getUpdateType();
- }
-
- @Override
- public String getLocation(final MapWaliRecord record) {
- return null;
- }
-
- @Override
- public int getVersion() {
- return 1;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
deleted file mode 100644
index 10139f1..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server.map;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.nifi.distributed.cache.server.EvictionPolicy;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SimpleMapCache implements MapCache {
- private static final Logger logger = LoggerFactory.getLogger(SimpleMapCache.class);
-
- private final Map<ByteBuffer, MapCacheRecord> cache = new HashMap<>();
- private final SortedMap<MapCacheRecord, ByteBuffer> inverseCacheMap;
-
- private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
- private final Lock readLock = rwLock.readLock();
- private final Lock writeLock = rwLock.writeLock();
-
- private final String serviceIdentifier;
-
- private final int maxSize;
-
- public SimpleMapCache(final String serviceIdentifier, final int maxSize, final EvictionPolicy evictionPolicy) {
- // need to change to ConcurrentMap as this is modified when only the readLock is held
- inverseCacheMap = new ConcurrentSkipListMap<>(evictionPolicy.getComparator());
- this.serviceIdentifier = serviceIdentifier;
- this.maxSize = maxSize;
- }
-
- @Override
- public String toString() {
- return "SimpleSetCache[service id=" + serviceIdentifier + "]";
- }
-
- // don't need synchronized because this method is only called when the writeLock is held, and all
- // public methods obtain either the read or write lock
- private MapCacheRecord evict() {
- if ( cache.size() < maxSize ) {
- return null;
- }
-
- final MapCacheRecord recordToEvict = inverseCacheMap.firstKey();
- final ByteBuffer valueToEvict = inverseCacheMap.remove(recordToEvict);
- cache.remove(valueToEvict);
-
- if ( logger.isDebugEnabled() ) {
- logger.debug("Evicting value {} from cache", new String(valueToEvict.array(), StandardCharsets.UTF_8));
- }
-
- return recordToEvict;
- }
-
- @Override
- public MapPutResult putIfAbsent(final ByteBuffer key, final ByteBuffer value) {
- writeLock.lock();
- try {
- final MapCacheRecord record = cache.get(key);
- if ( record == null ) {
- // Record is null. We will add.
- final MapCacheRecord evicted = evict();
- final MapCacheRecord newRecord = new MapCacheRecord(key, value);
- cache.put(key, newRecord);
- inverseCacheMap.put(newRecord, key);
-
- if ( evicted == null ) {
- return new MapPutResult(true, key, value, null, null, null);
- } else {
- return new MapPutResult(true, key, value, null, evicted.getKey(), evicted.getValue());
- }
- }
-
- // Record is not null. Increment hit count and return result indicating that record was not added.
- inverseCacheMap.remove(record);
- record.hit();
- inverseCacheMap.put(record, key);
-
- return new MapPutResult(false, key, value, record.getValue(), null, null);
- } finally {
- writeLock.unlock();
- }
- }
-
- @Override
- public boolean containsKey(final ByteBuffer key) {
- readLock.lock();
- try {
- final MapCacheRecord record = cache.get(key);
- if ( record == null ) {
- return false;
- }
-
- inverseCacheMap.remove(record);
- record.hit();
- inverseCacheMap.put(record, key);
-
- return true;
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- public ByteBuffer get(final ByteBuffer key) {
- readLock.lock();
- try {
- final MapCacheRecord record = cache.get(key);
- if ( record == null ) {
- return null;
- }
-
- inverseCacheMap.remove(record);
- record.hit();
- inverseCacheMap.put(record, key);
-
- return record.getValue();
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- public ByteBuffer remove(ByteBuffer key) throws IOException {
- writeLock.lock();
- try {
- final MapCacheRecord record = cache.remove(key);
- if (record == null) {
- return null;
- }
- inverseCacheMap.remove(record);
- return record.getValue();
- } finally {
- writeLock.unlock();
- }
- }
-
- @Override
- public void shutdown() throws IOException {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java
deleted file mode 100644
index 4d75fc0..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server.set;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.wali.MinimalLockingWriteAheadLog;
-import org.wali.SerDe;
-import org.wali.UpdateType;
-import org.wali.WriteAheadRepository;
-
-public class PersistentSetCache implements SetCache {
-
- private final SetCache wrapped;
- private final WriteAheadRepository<SetRecord> wali;
-
- private final AtomicLong modifications = new AtomicLong(0L);
-
- public PersistentSetCache(final String serviceIdentifier, final File persistencePath, final SetCache cacheToWrap) throws IOException {
- wali = new MinimalLockingWriteAheadLog<>(persistencePath.toPath(), 1, new Serde(), null);
- wrapped = cacheToWrap;
- }
-
- public synchronized void restore() throws IOException {
- final Collection<SetRecord> recovered = wali.recoverRecords();
- for ( final SetRecord record : recovered ) {
- if ( record.getUpdateType() == UpdateType.CREATE ) {
- addIfAbsent(record.getBuffer());
- }
- }
- }
-
- @Override
- public synchronized SetCacheResult remove(final ByteBuffer value) throws IOException {
- final SetCacheResult removeResult = wrapped.remove(value);
- if ( removeResult.getResult() ) {
- final SetRecord record = new SetRecord(UpdateType.DELETE, value);
- final List<SetRecord> records = new ArrayList<>();
- records.add(record);
- wali.update(records, false);
-
- final long modCount = modifications.getAndIncrement();
- if ( modCount > 0 && modCount % 1000 == 0 ) {
- wali.checkpoint();
- }
- }
-
- return removeResult;
- }
-
- @Override
- public synchronized SetCacheResult addIfAbsent(final ByteBuffer value) throws IOException {
- final SetCacheResult addResult = wrapped.addIfAbsent(value);
- if ( addResult.getResult() ) {
- final SetRecord record = new SetRecord(UpdateType.CREATE, value);
- final List<SetRecord> records = new ArrayList<>();
- records.add(record);
-
- final SetCacheRecord evictedRecord = addResult.getEvictedRecord();
- if ( evictedRecord != null ) {
- records.add(new SetRecord(UpdateType.DELETE, evictedRecord.getValue()));
- }
-
- wali.update(records, false);
-
- final long modCount = modifications.getAndIncrement();
- if ( modCount > 0 && modCount % 1000 == 0 ) {
- wali.checkpoint();
- }
- }
-
- return addResult;
- }
-
- @Override
- public synchronized SetCacheResult contains(final ByteBuffer value) throws IOException {
- return wrapped.contains(value);
- }
-
- @Override
- public void shutdown() throws IOException {
- wali.shutdown();
- }
-
- private static class SetRecord {
- private final UpdateType updateType;
- private final ByteBuffer value;
-
- public SetRecord(final UpdateType updateType, final ByteBuffer value) {
- this.updateType = updateType;
- this.value = value;
- }
-
- public UpdateType getUpdateType() {
- return updateType;
- }
-
- public ByteBuffer getBuffer() {
- return value;
- }
-
- public byte[] getData() {
- return value.array();
- }
- }
-
- private static class Serde implements SerDe<SetRecord> {
-
- @Override
- public void serializeEdit(final SetRecord previousRecordState, final SetRecord newRecordState, final DataOutputStream out) throws IOException {
- final UpdateType updateType = newRecordState.getUpdateType();
- if ( updateType == UpdateType.DELETE ) {
- out.write(0);
- } else {
- out.write(1);
- }
-
- final byte[] data = newRecordState.getData();
- out.writeInt(data.length);
- out.write(newRecordState.getData());
- }
-
- @Override
- public void serializeRecord(SetRecord record, DataOutputStream out) throws IOException {
- serializeEdit(null, record, out);
- }
-
- @Override
- public SetRecord deserializeEdit(final DataInputStream in, final Map<Object, SetRecord> currentRecordStates, final int version) throws IOException {
- final int value = in.read();
- if ( value < 0 ) {
- throw new EOFException();
- }
-
- final UpdateType updateType = (value == 0 ? UpdateType.DELETE : UpdateType.CREATE);
-
- final int size = in.readInt();
- final byte[] data = new byte[size];
- in.readFully(data);
-
- return new SetRecord(updateType, ByteBuffer.wrap(data));
- }
-
- @Override
- public SetRecord deserializeRecord(DataInputStream in, int version) throws IOException {
- return deserializeEdit(in, new HashMap<Object, SetRecord>(), version);
- }
-
- @Override
- public Object getRecordIdentifier(final SetRecord record) {
- return record.getBuffer();
- }
-
- @Override
- public UpdateType getUpdateType(final SetRecord record) {
- return record.getUpdateType();
- }
-
- @Override
- public String getLocation(final SetRecord record) {
- return null;
- }
-
- @Override
- public int getVersion() {
- return 1;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java
deleted file mode 100644
index bf6ae3e..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server.set;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-public interface SetCache {
-
- SetCacheResult remove(ByteBuffer value) throws IOException;
- SetCacheResult addIfAbsent(ByteBuffer value) throws IOException;
- SetCacheResult contains(ByteBuffer value) throws IOException;
- void shutdown() throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java
deleted file mode 100644
index 20b6fae..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server.set;
-
-import java.nio.ByteBuffer;
-
-import org.apache.nifi.distributed.cache.server.CacheRecord;
-
-public class SetCacheRecord extends CacheRecord {
- private final ByteBuffer value;
-
- public SetCacheRecord(final ByteBuffer value) {
- this.value = value;
- }
-
- public ByteBuffer getValue() {
- return value;
- }
-
- @Override
- public int hashCode() {
- return value.hashCode();
- }
-
- @Override
- public boolean equals(final Object obj) {
- if ( this == obj ) {
- return true;
- }
-
- if (obj instanceof SetCacheRecord) {
- return value.equals(((SetCacheRecord) obj).value);
- }
- return false;
- }
-
- @Override
- public String toString() {
- return "SetCacheRecord[value=" + new String(value.array()) + ", hitCount=" + getHitCount() + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java
deleted file mode 100644
index 732c4f0..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.server.set;
-
-
-
-public class SetCacheResult {
- private final boolean result;
- private final SetCacheRecord stats;
- private final SetCacheRecord evictedRecord;
-
- public SetCacheResult(final boolean result, final SetCacheRecord stats, final SetCacheRecord evictedRecord) {
- this.result = result;
- this.stats = stats;
- this.evictedRecord = evictedRecord;
- }
-
- public boolean getResult() {
- return result;
- }
-
- public SetCacheRecord getRecord() {
- return stats;
- }
-
- public SetCacheRecord getEvictedRecord() {
- return evictedRecord;
- }
-}