You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2017/01/16 17:15:29 UTC

nifi git commit: NIFI-3214: Added fetch and replace to DistributedMapCache

Repository: nifi
Updated Branches:
  refs/heads/master 8c7539b20 -> ef54a8ec6


NIFI-3214: Added fetch and replace to DistributedMapCache

- Using fetch and replace together can provide optimistic locking for
  concurrency control.
- Added fetch to get cache entry with its meta data such as revision
  number.
- Added replace to update cache only if it has not been updated.
- Added Map Cache protocol version 2 for those new operations.
- Existing operations such as get or put can work with protocol version
  1.

This closes #1410.

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ef54a8ec
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ef54a8ec
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ef54a8ec

Branch: refs/heads/master
Commit: ef54a8ec69b6b363eedd50944c48d5d013cf5ccc
Parents: 8c7539b
Author: Koji Kawamura <ij...@apache.org>
Authored: Wed Jan 11 15:57:40 2017 +0900
Committer: Bryan Bende <bb...@apache.org>
Committed: Mon Jan 16 12:15:06 2017 -0500

----------------------------------------------------------------------
 .../client/AtomicDistributedMapCacheClient.java |  76 +++++++++
 .../distributed/cache/client/CommsSession.java  |   4 +
 .../DistributedMapCacheClientService.java       |  61 ++++++-
 .../DistributedSetCacheClientService.java       |   1 +
 .../cache/client/SSLCommsSession.java           |  11 ++
 .../cache/client/StandardCacheEntry.java        |  46 ++++++
 .../cache/client/StandardCommsSession.java      |  13 ++
 .../cache/protocol/ProtocolHandshake.java       |  14 ++
 .../cache/server/AbstractCacheServer.java       |  16 +-
 .../server/map/DistributedMapCacheServer.java   |   7 +-
 .../distributed/cache/server/map/MapCache.java  |   4 +
 .../cache/server/map/MapCacheRecord.java        |  18 ++-
 .../cache/server/map/MapCacheServer.java        |  43 ++++-
 .../cache/server/map/MapPutResult.java          |  38 ++---
 .../cache/server/map/PersistentMapCache.java    |  43 +++--
 .../cache/server/map/SimpleMapCache.java        |  84 +++++++---
 .../cache/server/TestServerAndClient.java       | 157 +++++++++++++++++++
 .../cache/server/map/TestSimpleMapCache.java    | 132 ++++++++++++++++
 18 files changed, 682 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ef54a8ec/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/AtomicDistributedMapCacheClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/AtomicDistributedMapCacheClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/AtomicDistributedMapCacheClient.java
new file mode 100644
index 0000000..d0b77e1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/AtomicDistributedMapCacheClient.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+
+import java.io.IOException;
+
+/**
+ * <p>This interface defines an API that can be used for interacting with a
+ * Distributed Cache that functions similarly to a {@link java.util.Map Map}.
+ *
+ * <p>In addition to the API defined in {@link DistributedMapCacheClient} super class,
+ * this class provides methods for concurrent atomic updates those are added since Map Cache protocol version 2.
+ *
+ * <p>If a remote cache server doesn't support Map Cache protocol version 2, these methods throw UnsupportedOperationException.
+ */
+@Tags({"distributed", "client", "cluster", "map", "cache"})
+@CapabilityDescription("Provides the ability to communicate with a DistributedMapCacheServer. This allows "
+        + "multiple nodes to coordinate state with a single remote entity.")
+public interface AtomicDistributedMapCacheClient extends DistributedMapCacheClient {
+
+    interface CacheEntry<K, V> {
+
+        long getRevision();
+
+        K getKey();
+
+        V getValue();
+
+    }
+
+    /**
+     * Fetch a CacheEntry with a key.
+     * @param <K> the key type
+     * @param <V> the value type
+     * @param key the key to lookup in the map
+     * @param keySerializer key serializer
+     * @param valueDeserializer value deserializer
+     * @return A CacheEntry instance if one exists, otherwise <cod>null</cod>.
+     * @throws IOException if unable to communicate with the remote instance
+     */
+    <K, V> CacheEntry<K, V> fetch(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException;
+
+    /**
+     * Replace an existing key with new value.
+     * @param <K> the key type
+     * @param <V> the value type
+     * @param key the key to replace
+     * @param value the new value for the key
+     * @param keySerializer key serializer
+     * @param valueSerializer value serializer
+     * @param revision a revision that was retrieved by a preceding fetch operation, if the key is already updated by other client,
+     *                 this doesn't match with the one on server, therefore the replace operation will not be performed.
+     *                 If there's no existing entry for the key, any revision can replace the key.
+     * @return true only if the key is replaced.
+     * @throws IOException if unable to communicate with the remote instance
+     */
+    <K, V> boolean replace(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, long revision) throws IOException;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/ef54a8ec/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java
index c035485..83b3b90 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java
@@ -43,4 +43,8 @@ public interface CommsSession extends Closeable {
     long getTimeout(TimeUnit timeUnit);
 
     SSLContext getSSLContext();
+
+    int getProtocolVersion();
+
+    void setProtocolVersion(final int protocolVersion);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ef54a8ec/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
index e78ae0b..5379bc1 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
@@ -16,7 +16,9 @@
  */
 package org.apache.nifi.distributed.cache.client;
 
+import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -39,8 +41,6 @@ import org.apache.nifi.remote.StandardVersionNegotiator;
 import org.apache.nifi.remote.VersionNegotiator;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.ssl.SSLContextService.ClientAuth;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
-import org.apache.nifi.stream.io.DataOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,7 +48,7 @@ import org.slf4j.LoggerFactory;
 @SeeAlso(classNames = {"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", "org.apache.nifi.ssl.StandardSSLContextService"})
 @CapabilityDescription("Provides the ability to communicate with a DistributedMapCacheServer. This can be used in order to share a Map "
     + "between nodes in a NiFi cluster")
-public class DistributedMapCacheClientService extends AbstractControllerService implements DistributedMapCacheClient {
+public class DistributedMapCacheClientService extends AbstractControllerService implements AtomicDistributedMapCacheClient {
 
     private static final Logger logger = LoggerFactory.getLogger(DistributedMapCacheClientService.class);
 
@@ -216,6 +216,58 @@ public class DistributedMapCacheClientService extends AbstractControllerService
         });
     }
 
+    @Override
+    public <K, V> CacheEntry<K, V> fetch(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
+        return withCommsSession(session -> {
+            validateProtocolVersion(session, 2);
+
+            final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+            dos.writeUTF("fetch");
+
+            serialize(key, keySerializer, dos);
+            dos.flush();
+
+            // read response
+            final DataInputStream dis = new DataInputStream(session.getInputStream());
+            final long revision = dis.readLong();
+            final byte[] responseBuffer = readLengthDelimitedResponse(dis);
+
+            if (revision < 0) {
+                // This indicates that key was not found.
+                return null;
+            }
+
+            final StandardCacheEntry<K, V> standardCacheEntry = new StandardCacheEntry<>(key, valueDeserializer.deserialize(responseBuffer), revision);
+            return standardCacheEntry;
+        });
+    }
+
+    private void validateProtocolVersion(final CommsSession session, final int requiredProtocolVersion) {
+        if (session.getProtocolVersion() < requiredProtocolVersion) {
+            throw new UnsupportedOperationException("Remote cache server doesn't support protocol version " + requiredProtocolVersion);
+        }
+    }
+
+    @Override
+    public <K, V> boolean replace(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final long revision) throws IOException {
+        return withCommsSession(session -> {
+            validateProtocolVersion(session, 2);
+
+            final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+            dos.writeUTF("replace");
+
+            serialize(key, keySerializer, dos);
+            dos.writeLong(revision);
+            serialize(value, valueSerializer, dos);
+
+            dos.flush();
+
+            // read response
+            final DataInputStream dis = new DataInputStream(session.getInputStream());
+            return dis.readBoolean();
+        });
+    }
+
     private byte[] readLengthDelimitedResponse(final DataInputStream dis) throws IOException {
         final int responseLength = dis.readInt();
         final byte[] responseBuffer = new byte[responseLength];
@@ -247,9 +299,10 @@ public class DistributedMapCacheClientService extends AbstractControllerService
         }
 
         session = createCommsSession(configContext);
-        final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
+        final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(2, 1);
         try {
             ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator);
+            session.setProtocolVersion(versionNegotiator.getVersion());
         } catch (final HandshakeException e) {
             try {
                 session.close();

http://git-wip-us.apache.org/repos/asf/nifi/blob/ef54a8ec/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
index 82ab643..c1fa274 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
@@ -127,6 +127,7 @@ public class DistributedSetCacheClientService extends AbstractControllerService
         final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
         try {
             ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator);
+            session.setProtocolVersion(versionNegotiator.getVersion());
         } catch (final HandshakeException e) {
             IOUtils.closeQuietly(session);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/ef54a8ec/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
index 3d400bb..7576c5f 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
@@ -42,6 +42,8 @@ public class SSLCommsSession implements CommsSession {
     private final SSLSocketChannelOutputStream out;
     private final BufferedOutputStream bufferedOut;
 
+    private int protocolVersion;
+
     public SSLCommsSession(final SSLContext sslContext, final String hostname, final int port) throws IOException {
         sslSocketChannel = new SSLSocketChannel(sslContext, hostname, port, true);
 
@@ -106,4 +108,13 @@ public class SSLCommsSession implements CommsSession {
         return timeUnit.convert(sslSocketChannel.getTimeout(), TimeUnit.MILLISECONDS);
     }
 
+    @Override
+    public int getProtocolVersion() {
+        return protocolVersion;
+    }
+
+    @Override
+    public void setProtocolVersion(final int protocolVersion) {
+        this.protocolVersion = protocolVersion;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ef54a8ec/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCacheEntry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCacheEntry.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCacheEntry.java
new file mode 100644
index 0000000..b4949d5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCacheEntry.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+public class StandardCacheEntry<K,V> implements AtomicDistributedMapCacheClient.CacheEntry<K,V> {
+
+    private final K key;
+    private final V value;
+    private final long revision;
+
+
+    public StandardCacheEntry(final K key, final V value, final long revision) {
+        this.key = key;
+        this.value = value;
+        this.revision = revision;
+    }
+
+    @Override
+    public long getRevision() {
+        return revision;
+    }
+
+    @Override
+    public K getKey() {
+        return key;
+    }
+
+    @Override
+    public V getValue() {
+        return value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/ef54a8ec/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
index b2a5c1d..6a8ee45 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
@@ -45,6 +45,8 @@ public class StandardCommsSession implements CommsSession {
     private final SocketChannelOutputStream out;
     private final InterruptableOutputStream bufferedOut;
 
+    private int protocolVersion;
+
     public StandardCommsSession(final String hostname, final int port) throws IOException {
         socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port));
         socketChannel.configureBlocking(false);
@@ -122,4 +124,15 @@ public class StandardCommsSession implements CommsSession {
     public long getTimeout(final TimeUnit timeUnit) {
         return timeUnit.convert(timeoutMillis, TimeUnit.MILLISECONDS);
     }
+
+    @Override
+    public int getProtocolVersion() {
+        return protocolVersion;
+    }
+
+    @Override
+    public void setProtocolVersion(final int protocolVersion) {
+        this.protocolVersion = protocolVersion;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ef54a8ec/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java
index f36ac15..3df2f09 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java
@@ -33,6 +33,19 @@ public class ProtocolHandshake {
     public static final int DIFFERENT_RESOURCE_VERSION = 21;
     public static final int ABORT = 255;
 
+    /**
+     * <p>Initiate handshake to ensure client and server can communicate with the same protocol.
+     * If the server doesn't support requested protocol version, HandshakeException will be thrown.</p>
+     *
+     * <p>DistributedMapCache version histories:<ul>
+     *     <li>2: Added atomic update operations (fetch and replace) using optimistic lock with revision number.</li>
+     *     <li>1: Initial version.</li>
+     * </ul></p>
+     *
+     * <p>DistributedSetCache version histories:<ul>
+     *     <li>1: Initial version.</li>
+     * </ul></p>
+     */
     public static void initiateHandshake(final InputStream in, final OutputStream out, final VersionNegotiator versionNegotiator) throws IOException, HandshakeException {
         final DataInputStream dis = new DataInputStream(in);
         final DataOutputStream dos = new DataOutputStream(out);
@@ -85,6 +98,7 @@ public class ProtocolHandshake {
 
                 // Attempt negotiation of resource based on our new preferred version.
                 initiateVersionNegotiation(negotiator, dis, dos);
+                return;
             case ABORT:
                 throw new HandshakeException("Remote destination aborted connection with message: " + dis.readUTF());
             default:

http://git-wip-us.apache.org/repos/asf/nifi/blob/ef54a8ec/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
index 5c5a9cb..1968162 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.distributed.cache.server;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -30,8 +32,6 @@ import javax.net.ssl.SSLContext;
 
 import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
 import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.stream.io.BufferedOutputStream;
 import org.apache.nifi.remote.StandardVersionNegotiator;
 import org.apache.nifi.remote.VersionNegotiator;
 import org.apache.nifi.remote.io.socket.SocketChannelInputStream;
@@ -121,9 +121,9 @@ public abstract class AbstractCacheServer implements CacheServer {
                                 return;
                             }
                             try (final InputStream in = new BufferedInputStream(rawInputStream);
-                                final OutputStream out = new BufferedOutputStream(rawOutputStream)) {
+                                 final OutputStream out = new BufferedOutputStream(rawOutputStream)) {
 
-                                final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
+                                final VersionNegotiator versionNegotiator = getVersionNegotiator();
 
                                 ProtocolHandshake.receiveHandshake(in, out, versionNegotiator);
 
@@ -163,6 +163,14 @@ public abstract class AbstractCacheServer implements CacheServer {
         thread.start();
     }
 
+    /**
+     * Refer {@link org.apache.nifi.distributed.cache.protocol.ProtocolHandshake#initiateHandshake(InputStream, OutputStream, VersionNegotiator)}
+     * for details of each version enhancements.
+     */
+    protected StandardVersionNegotiator getVersionNegotiator() {
+        return new StandardVersionNegotiator(1);
+    }
+
     @Override
     public void stop() throws IOException {
         stopped = true;

http://git-wip-us.apache.org/repos/asf/nifi/blob/ef54a8ec/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
index dce7ccd..6b85155 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.distributed.cache.server.map;
 
 import java.io.File;
+import java.io.IOException;
 
 import javax.net.ssl.SSLContext;
 
@@ -69,10 +70,14 @@ public class DistributedMapCacheServer extends DistributedCacheServer {
         try {
             final File persistenceDir = persistencePath == null ? null : new File(persistencePath);
 
-            return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir);
+            return createMapCacheServer(port, maxSize, sslContext, evictionPolicy, persistenceDir);
         } catch (final Exception e) {
             throw new RuntimeException(e);
         }
     }
 
+    protected MapCacheServer createMapCacheServer(int port, int maxSize, SSLContext sslContext, EvictionPolicy evictionPolicy, File persistenceDir) throws IOException {
+        return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ef54a8ec/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
index e9c6f1d..67f5bab 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
@@ -31,5 +31,9 @@ public interface MapCache {
 
     ByteBuffer remove(ByteBuffer key) throws IOException;
 
+    MapCacheRecord fetch(ByteBuffer key) throws IOException;
+
+    MapPutResult replace(MapCacheRecord record) throws IOException;
+
     void shutdown() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ef54a8ec/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java
index ff032b1..84af198 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.distributed.cache.server.map;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 
 import org.apache.nifi.distributed.cache.server.CacheRecord;
 
@@ -24,10 +25,19 @@ public class MapCacheRecord extends CacheRecord {
 
     private final ByteBuffer key;
     private final ByteBuffer value;
+    /**
+     * Revision is a number that increases every time the key is updated.
+     */
+    private final long revision;
 
     public MapCacheRecord(final ByteBuffer key, final ByteBuffer value) {
+        this(key, value, -1L);
+    }
+
+    public MapCacheRecord(final ByteBuffer key, final ByteBuffer value, final long revision) {
         this.key = key;
         this.value = value;
+        this.revision = revision;
     }
 
     public ByteBuffer getKey() {
@@ -40,7 +50,7 @@ public class MapCacheRecord extends CacheRecord {
 
     @Override
     public int hashCode() {
-        return 2938476 + key.hashCode() * value.hashCode();
+        return Arrays.hashCode(new Object[]{key, value, revision});
     }
 
     @Override
@@ -51,9 +61,13 @@ public class MapCacheRecord extends CacheRecord {
 
         if (obj instanceof MapCacheRecord) {
             final MapCacheRecord that = ((MapCacheRecord) obj);
-            return key.equals(that.key) && value.equals(that.value);
+            return key.equals(that.key) && value.equals(that.value) && revision == that.revision;
         }
 
         return false;
     }
+
+    public long getRevision() {
+        return revision;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ef54a8ec/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
index 13ed0df..99eacd7 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.distributed.cache.server.map;
 
 import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -27,7 +28,8 @@ import javax.net.ssl.SSLContext;
 
 import org.apache.nifi.distributed.cache.server.AbstractCacheServer;
 import org.apache.nifi.distributed.cache.server.EvictionPolicy;
-import org.apache.nifi.stream.io.DataOutputStream;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.VersionNegotiator;
 
 public class MapCacheServer extends AbstractCacheServer {
 
@@ -48,6 +50,14 @@ public class MapCacheServer extends AbstractCacheServer {
         }
     }
 
+    /**
+     * Refer {@link org.apache.nifi.distributed.cache.protocol.ProtocolHandshake#initiateHandshake(InputStream, OutputStream, VersionNegotiator)}
+     * for details of each version enhancements.
+     */
+    protected StandardVersionNegotiator getVersionNegotiator() {
+        return new StandardVersionNegotiator(2, 1);
+    }
+
     @Override
     protected boolean listen(final InputStream in, final OutputStream out, final int version) throws IOException {
         final DataInputStream dis = new DataInputStream(in);
@@ -88,7 +98,7 @@ public class MapCacheServer extends AbstractCacheServer {
                     dos.writeInt(0);
                 } else {
                     // we didn't put. Write back the previous value
-                    final byte[] byteArray = putResult.getExistingValue().array();
+                    final byte[] byteArray = putResult.getExisting().getValue().array();
                     dos.writeInt(byteArray.length);
                     dos.write(byteArray);
                 }
@@ -99,10 +109,10 @@ public class MapCacheServer extends AbstractCacheServer {
                 final byte[] key = readValue(dis);
                 final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key));
                 if (existingValue == null) {
-                    // there was no existing value; we did a "put".
+                    // there was no existing value.
                     dos.writeInt(0);
                 } else {
-                    // a value already existed. we did not update the map
+                    // a value already existed.
                     final byte[] byteArray = existingValue.array();
                     dos.writeInt(byteArray.length);
                     dos.write(byteArray);
@@ -116,6 +126,31 @@ public class MapCacheServer extends AbstractCacheServer {
                 dos.writeBoolean(removed);
                 break;
             }
+            case "fetch": {
+                final byte[] key = readValue(dis);
+                final MapCacheRecord existing = cache.fetch(ByteBuffer.wrap(key));
+                if (existing == null) {
+                    // there was no existing value.
+                    dos.writeLong(-1);
+                    dos.writeInt(0);
+                } else {
+                    // a value already existed.
+                    dos.writeLong(existing.getRevision());
+                    final byte[] byteArray = existing.getValue().array();
+                    dos.writeInt(byteArray.length);
+                    dos.write(byteArray);
+                }
+
+                break;
+            }
+            case "replace": {
+                final byte[] key = readValue(dis);
+                final long revision = dis.readLong();
+                final byte[] value = readValue(dis);
+                final MapPutResult result = cache.replace(new MapCacheRecord(ByteBuffer.wrap(key), ByteBuffer.wrap(value), revision));
+                dos.writeBoolean(result.isSuccessful());
+                break;
+            }
             default: {
                 throw new IOException("Illegal Request");
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ef54a8ec/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java
index d0055f3..3750aee 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java
@@ -16,45 +16,33 @@
  */
 package org.apache.nifi.distributed.cache.server.map;
 
-import java.nio.ByteBuffer;
-
 public class MapPutResult {
 
     private final boolean successful;
-    private final ByteBuffer key, value;
-    private final ByteBuffer existingValue;
-    private final ByteBuffer evictedKey, evictedValue;
+    private final MapCacheRecord record;
+    private final MapCacheRecord existing;
+    private final MapCacheRecord evicted;
 
-    public MapPutResult(final boolean successful, final ByteBuffer key, final ByteBuffer value, final ByteBuffer existingValue, final ByteBuffer evictedKey, final ByteBuffer evictedValue) {
+    public MapPutResult(boolean successful, MapCacheRecord record, MapCacheRecord existing, MapCacheRecord evicted) {
         this.successful = successful;
-        this.key = key;
-        this.value = value;
-        this.existingValue = existingValue;
-        this.evictedKey = evictedKey;
-        this.evictedValue = evictedValue;
+        this.record = record;
+        this.existing = existing;
+        this.evicted = evicted;
     }
 
     public boolean isSuccessful() {
         return successful;
     }
 
-    public ByteBuffer getKey() {
-        return key;
-    }
-
-    public ByteBuffer getValue() {
-        return value;
-    }
-
-    public ByteBuffer getExistingValue() {
-        return existingValue;
+    public MapCacheRecord getRecord() {
+        return record;
     }
 
-    public ByteBuffer getEvictedKey() {
-        return evictedKey;
+    public MapCacheRecord getExisting() {
+        return existing;
     }
 
-    public ByteBuffer getEvictedValue() {
-        return evictedValue;
+    public MapCacheRecord getEvicted() {
+        return evicted;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ef54a8ec/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
index c2fc0d7..62deae5 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
@@ -57,38 +57,27 @@ public class PersistentMapCache implements MapCache {
     @Override
     public MapPutResult putIfAbsent(final ByteBuffer key, final ByteBuffer value) throws IOException {
         final MapPutResult putResult = wrapped.putIfAbsent(key, value);
-        if (putResult.isSuccessful()) {
-            // The put was successful.
-            final MapWaliRecord record = new MapWaliRecord(UpdateType.CREATE, key, value);
-            final List<MapWaliRecord> records = new ArrayList<>();
-            records.add(record);
-
-            if (putResult.getEvictedKey() != null) {
-                records.add(new MapWaliRecord(UpdateType.DELETE, putResult.getEvictedKey(), putResult.getEvictedValue()));
-            }
-
-            wali.update(Collections.singletonList(record), false);
-
-            final long modCount = modifications.getAndIncrement();
-            if (modCount > 0 && modCount % 100000 == 0) {
-                wali.checkpoint();
-            }
-        }
-
+        putWriteAheadLog(key, value, putResult);
         return putResult;
     }
 
     @Override
     public MapPutResult put(final ByteBuffer key, final ByteBuffer value) throws IOException {
         final MapPutResult putResult = wrapped.put(key, value);
+        putWriteAheadLog(key, value, putResult);
+        return putResult;
+    }
+
+    protected void putWriteAheadLog(ByteBuffer key, ByteBuffer value, MapPutResult putResult) throws IOException {
         if ( putResult.isSuccessful() ) {
             // The put was successful.
             final MapWaliRecord record = new MapWaliRecord(UpdateType.CREATE, key, value);
             final List<MapWaliRecord> records = new ArrayList<>();
             records.add(record);
 
-            if ( putResult.getEvictedKey() != null ) {
-                records.add(new MapWaliRecord(UpdateType.DELETE, putResult.getEvictedKey(), putResult.getEvictedValue()));
+            final MapCacheRecord evicted = putResult.getEvicted();
+            if ( evicted != null ) {
+                records.add(new MapWaliRecord(UpdateType.DELETE, evicted.getKey(), evicted.getValue()));
             }
 
             wali.update(Collections.singletonList(record), false);
@@ -98,8 +87,6 @@ public class PersistentMapCache implements MapCache {
                 wali.checkpoint();
             }
         }
-
-        return putResult;
     }
 
     @Override
@@ -113,6 +100,18 @@ public class PersistentMapCache implements MapCache {
     }
 
     @Override
+    public MapCacheRecord fetch(ByteBuffer key) throws IOException {
+        return wrapped.fetch(key);
+    }
+
+    @Override
+    public MapPutResult replace(MapCacheRecord record) throws IOException {
+        final MapPutResult putResult = wrapped.replace(record);
+        putWriteAheadLog(record.getKey(), record.getValue(), putResult);
+        return putResult;
+    }
+
+    @Override
     public ByteBuffer remove(final ByteBuffer key) throws IOException {
         final ByteBuffer removeResult = wrapped.remove(key);
         if (removeResult != null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/ef54a8ec/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
index b167c62..ebcf91a 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
@@ -84,16 +84,7 @@ public class SimpleMapCache implements MapCache {
             final MapCacheRecord record = cache.get(key);
             if (record == null) {
                 // Record is null. We will add.
-                final MapCacheRecord evicted = evict();
-                final MapCacheRecord newRecord = new MapCacheRecord(key, value);
-                cache.put(key, newRecord);
-                inverseCacheMap.put(newRecord, key);
-
-                if (evicted == null) {
-                    return new MapPutResult(true, key, value, null, null, null);
-                } else {
-                    return new MapPutResult(true, key, value, null, evicted.getKey(), evicted.getValue());
-                }
+                return put(key, value, record);
             }
 
             // Record is not null. Increment hit count and return result indicating that record was not added.
@@ -101,29 +92,37 @@ public class SimpleMapCache implements MapCache {
             record.hit();
             inverseCacheMap.put(record, key);
 
-            return new MapPutResult(false, key, value, record.getValue(), null, null);
+            return new MapPutResult(false, record, record, null);
         } finally {
             writeLock.unlock();
         }
     }
 
+    private MapPutResult put(final ByteBuffer key, final ByteBuffer value, final MapCacheRecord existing) {
+        // evict if we need to in order to make room for a new entry.
+        final MapCacheRecord evicted = evict();
 
-    @Override
-    public MapPutResult put(final ByteBuffer key, final ByteBuffer value) {
-        writeLock.lock();
-        try {
-            // evict if we need to in order to make room for a new entry.
-            final MapCacheRecord evicted = evict();
+        final long revision;
+        if (existing == null) {
+            revision = 0;
+        } else {
+            revision = existing.getRevision() + 1;
+            inverseCacheMap.remove(existing);
+        }
 
-            final MapCacheRecord record = new MapCacheRecord(key, value);
-            final MapCacheRecord existing = cache.put(key, record);
-            inverseCacheMap.put(record, key);
+        final MapCacheRecord record = new MapCacheRecord(key, value, revision);
+        cache.put(key, record);
+        inverseCacheMap.put(record, key);
 
-            final ByteBuffer existingValue = (existing == null) ? null : existing.getValue();
-            final ByteBuffer evictedKey = (evicted == null) ? null : evicted.getKey();
-            final ByteBuffer evictedValue = (evicted == null) ? null : evicted.getValue();
+        return new MapPutResult(true, record, existing, evicted);
+    }
 
-            return new MapPutResult(true, key, value, existingValue, evictedKey, evictedValue);
+    @Override
+    public MapPutResult put(final ByteBuffer key, final ByteBuffer value) throws IOException {
+        writeLock.lock();
+        try {
+            final MapCacheRecord existing = cache.get(key);
+            return put(key, value, existing);
         } finally {
             writeLock.unlock();
         }
@@ -183,6 +182,43 @@ public class SimpleMapCache implements MapCache {
     }
 
     @Override
+    public MapCacheRecord fetch(ByteBuffer key) throws IOException {
+        readLock.lock();
+        try {
+            final MapCacheRecord record = cache.get(key);
+            if (record == null) {
+                return null;
+            }
+
+            inverseCacheMap.remove(record);
+            record.hit();
+            inverseCacheMap.put(record, key);
+
+            return record;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    @Override
+    public MapPutResult replace(MapCacheRecord inputRecord) throws IOException {
+        writeLock.lock();
+        try {
+            final ByteBuffer key = inputRecord.getKey();
+            final ByteBuffer value = inputRecord.getValue();
+            final MapCacheRecord existing = fetch(key);
+            if (existing != null && inputRecord.getRevision() != existing.getRevision()) {
+                // The key has been updated by other operation.
+                return new MapPutResult(false, inputRecord, existing, null);
+            }
+
+            return put(key, value, existing);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    @Override
     public void shutdown() throws IOException {
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ef54a8ec/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
index 82e4a99..c8f9478 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
@@ -18,6 +18,7 @@ package org.apache.nifi.distributed.cache.server;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -33,14 +34,17 @@ import java.util.Map;
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SystemUtils;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
 import org.apache.nifi.distributed.cache.client.Deserializer;
 import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
 import org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService;
 import org.apache.nifi.distributed.cache.client.Serializer;
 import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
 import org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer;
+import org.apache.nifi.distributed.cache.server.map.MapCacheServer;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.remote.StandardVersionNegotiator;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.MockConfigurationContext;
 import org.apache.nifi.util.MockControllerServiceInitializationContext;
@@ -52,6 +56,8 @@ import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.net.ssl.SSLContext;
+
 public class TestServerAndClient {
 
     private static Logger LOGGER;
@@ -450,6 +456,157 @@ public class TestServerAndClient {
         server.shutdownServer();
     }
 
+    @Test
+    public void testOptimisticLock() throws Exception {
+
+        /**
+         * This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug
+         * See:  https://issues.apache.org/jira/browse/NIFI-437
+         */
+        Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437",
+                SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
+
+        LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
+
+        // Create server
+        final DistributedMapCacheServer server = new MapServer();
+        final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
+        runner.addControllerService("server", server);
+        runner.enableControllerService(server);
+
+        DistributedMapCacheClientService client1 = new DistributedMapCacheClientService();
+        MockControllerServiceInitializationContext clientInitContext1 = new MockControllerServiceInitializationContext(client1, "client1");
+        client1.initialize(clientInitContext1);
+
+        DistributedMapCacheClientService client2 = new DistributedMapCacheClientService();
+        MockControllerServiceInitializationContext clientInitContext2 = new MockControllerServiceInitializationContext(client2, "client2");
+        client1.initialize(clientInitContext2);
+
+        final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
+        clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost");
+        clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(server.getPort()));
+        clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs");
+
+        MockConfigurationContext clientContext1 = new MockConfigurationContext(clientProperties, clientInitContext1.getControllerServiceLookup());
+        client1.cacheConfig(clientContext1);
+        MockConfigurationContext clientContext2 = new MockConfigurationContext(clientProperties, clientInitContext2.getControllerServiceLookup());
+        client2.cacheConfig(clientContext2);
+
+        final Serializer<String> stringSerializer = new StringSerializer();
+        final Deserializer<String> stringDeserializer = new StringDeserializer();
+
+        final String key = "test-optimistic-lock";
+
+        // Ensure there's no existing key
+        assertFalse(client1.containsKey(key, stringSerializer));
+        assertNull(client1.fetch(key, stringSerializer, stringDeserializer));
+
+        // Client 1 inserts the key.
+        client1.put(key, "valueC1-0", stringSerializer, stringSerializer);
+
+        // Client 1 and 2 fetch the key
+        AtomicDistributedMapCacheClient.CacheEntry<String, String> c1 = client1.fetch(key, stringSerializer, stringDeserializer);
+        AtomicDistributedMapCacheClient.CacheEntry<String, String> c2 = client2.fetch(key, stringSerializer, stringDeserializer);
+        assertEquals(0, c1.getRevision());
+        assertEquals("valueC1-0", c1.getValue());
+        assertEquals(0, c2.getRevision());
+        assertEquals("valueC1-0", c2.getValue());
+
+        // Client 1 replace
+        boolean c1Result = client1.replace(key, "valueC1-1", stringSerializer, stringSerializer, c1.getRevision());
+        assertTrue("C1 should be able to replace the key", c1Result);
+        // Client 2 replace with the old revision
+        boolean c2Result = client2.replace(key, "valueC2-1", stringSerializer, stringSerializer, c2.getRevision());
+        assertFalse("C2 shouldn't be able to replace the key", c2Result);
+
+        // Client 2 fetch the key again
+        c2 = client2.fetch(key, stringSerializer, stringDeserializer);
+        assertEquals("valueC1-1", c2.getValue());
+        assertEquals(1, c2.getRevision());
+
+        // Now, Client 2 knows the correct revision so it can replace the key
+        c2Result = client2.replace(key, "valueC2-2", stringSerializer, stringSerializer, c2.getRevision());
+        assertTrue("C2 should be able to replace the key", c2Result);
+
+        // Assert the cache
+        c2 = client2.fetch(key, stringSerializer, stringDeserializer);
+        assertEquals("valueC2-2", c2.getValue());
+        assertEquals(2, c2.getRevision());
+
+        client1.close();
+        client2.close();
+        server.shutdownServer();
+    }
+
+    @Test
+    public void testBackwardCompatibility() throws Exception {
+
+        /**
+         * This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug
+         * See:  https://issues.apache.org/jira/browse/NIFI-437
+         */
+        Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437",
+                SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
+
+        LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
+
+        final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
+
+        // Create a server that only supports protocol version 1.
+        final DistributedMapCacheServer server = new MapServer() {
+            @Override
+            protected MapCacheServer createMapCacheServer(int port, int maxSize, SSLContext sslContext, EvictionPolicy evictionPolicy, File persistenceDir) throws IOException {
+                return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir) {
+                    @Override
+                    protected StandardVersionNegotiator getVersionNegotiator() {
+                        return new StandardVersionNegotiator(1);
+                    }
+                };
+            }
+        };
+        runner.addControllerService("server", server);
+        runner.enableControllerService(server);
+
+        DistributedMapCacheClientService client = new DistributedMapCacheClientService();
+        MockControllerServiceInitializationContext clientInitContext1 = new MockControllerServiceInitializationContext(client, "client");
+        client.initialize(clientInitContext1);
+
+        final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
+        clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost");
+        clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(server.getPort()));
+        clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs");
+
+        MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext1.getControllerServiceLookup());
+        client.cacheConfig(clientContext);
+
+        final Serializer<String> stringSerializer = new StringSerializer();
+        final Deserializer<String> stringDeserializer = new StringDeserializer();
+
+        final String key = "test-backward-compatibility";
+
+        // Version 1 operations should work
+        client.put(key, "value1", stringSerializer, stringSerializer);
+        assertEquals("value1", client.get(key, stringSerializer, stringDeserializer));
+
+        assertTrue(client.containsKey(key, stringSerializer));
+
+        try {
+            client.fetch(key, stringSerializer, stringDeserializer);
+            fail("Version 2 operations should NOT work.");
+        } catch (UnsupportedOperationException e) {
+        }
+
+        try {
+            client.replace(key, "value2", stringSerializer, stringSerializer, 0L);
+            fail("Version 2 operations should NOT work.");
+        } catch (UnsupportedOperationException e) {
+        }
+
+        client.close();
+        server.shutdownServer();
+    }
+
+
     private void waitABit() {
         try {
             Thread.sleep(10L);

http://git-wip-us.apache.org/repos/asf/nifi/blob/ef54a8ec/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestSimpleMapCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestSimpleMapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestSimpleMapCache.java
new file mode 100644
index 0000000..2e19714
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestSimpleMapCache.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.map;
+
+import org.apache.nifi.distributed.cache.server.EvictionPolicy;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestSimpleMapCache {
+
+    @Test
+    public void testBasicOperations() throws Exception {
+        final SimpleMapCache cache = new SimpleMapCache("service-id", 2, EvictionPolicy.FIFO);
+
+        final ByteBuffer key1 = ByteBuffer.wrap("key1".getBytes());
+        final ByteBuffer key2 = ByteBuffer.wrap("key2".getBytes());
+        final ByteBuffer key3 = ByteBuffer.wrap("key3".getBytes());
+        ByteBuffer value1 = ByteBuffer.wrap("value1-0".getBytes());
+        ByteBuffer value2 = ByteBuffer.wrap("value2-0".getBytes());
+        ByteBuffer value3 = ByteBuffer.wrap("value3-0".getBytes());
+
+        // Initial state.
+        assertNull(cache.get(key1));
+        assertNull(cache.fetch(key1));
+
+        // Put the 1st key.
+        MapPutResult putResult = cache.put(key1, value1);
+        assertTrue(putResult.isSuccessful());
+        assertNull(putResult.getExisting());
+        assertNull(putResult.getEvicted());
+        assertEquals(0, putResult.getRecord().getRevision());
+
+        // Update the same key.
+        value1 = ByteBuffer.wrap("value1-1".getBytes());
+        putResult = cache.put(key1, value1);
+        assertTrue(putResult.isSuccessful());
+        assertNotNull(putResult.getExisting());
+        assertEquals(1, putResult.getRecord().getRevision());
+        assertEquals(key1, putResult.getExisting().getKey());
+        assertEquals("value1-0", new String(putResult.getExisting().getValue().array()));
+        assertNull(putResult.getEvicted());
+
+        // Put the 2nd key.
+        putResult = cache.put(key2, value2);
+        assertTrue(putResult.isSuccessful());
+        assertNull(putResult.getExisting());
+        assertNull(putResult.getEvicted());
+        assertEquals(0, putResult.getRecord().getRevision());
+
+        // Put the 3rd key.
+        putResult = cache.put(key3, value3);
+        assertTrue(putResult.isSuccessful());
+        assertNull(putResult.getExisting());
+        assertNotNull("The first key should be evicted", putResult.getEvicted());
+        assertEquals("key1", new String(putResult.getEvicted().getKey().array()));
+        assertEquals("value1-1", new String(putResult.getEvicted().getValue().array()));
+        assertEquals(0, putResult.getRecord().getRevision());
+
+        // Delete 2nd key.
+        ByteBuffer removed = cache.remove(key2);
+        assertNotNull(removed);
+        assertEquals("value2-0", new String(removed.array()));
+
+        // Put the 2nd key again.
+        putResult = cache.put(key2, value2);
+        assertTrue(putResult.isSuccessful());
+        assertNull(putResult.getExisting());
+        assertNull(putResult.getEvicted());
+        assertEquals("Revision should start from 0", 0, putResult.getRecord().getRevision());
+
+    }
+
+    @Test
+    public void testOptimisticLock() throws Exception {
+
+        final SimpleMapCache cache = new SimpleMapCache("service-id", 2, EvictionPolicy.FIFO);
+
+        final ByteBuffer key = ByteBuffer.wrap("key1".getBytes());
+        ByteBuffer valueC1 = ByteBuffer.wrap("valueC1-0".getBytes());
+        ByteBuffer valueC2 = ByteBuffer.wrap("valueC2-0".getBytes());
+
+        assertNull("If there's no existing key, fetch should return null.", cache.fetch(key));
+
+        // Client 1 inserts the key.
+        MapCacheRecord c1 = new MapCacheRecord(key, valueC1);
+        MapPutResult putResult = cache.replace(c1);
+        assertTrue("Replace should succeed if there's no existing key.", putResult.isSuccessful());
+
+        MapCacheRecord c2 = new MapCacheRecord(key, valueC2);
+        putResult = cache.replace(c2);
+        assertFalse("Replace should fail.", putResult.isSuccessful());
+
+        // Client 1 and 2 fetch the key
+        c1 = cache.fetch(key);
+        c2 = cache.fetch(key);
+        assertEquals(0, c1.getRevision());
+        assertEquals(0, c2.getRevision());
+
+        // Client 1 replace
+        valueC1 = ByteBuffer.wrap("valueC1-1".getBytes());
+        putResult = cache.replace(new MapCacheRecord(key, valueC1, c1.getRevision()));
+        assertTrue("Replace should succeed since revision matched.", putResult.isSuccessful());
+        assertEquals(1, putResult.getRecord().getRevision());
+
+        // Client 2 replace with the old revision
+        valueC2 = ByteBuffer.wrap("valueC2-1".getBytes());
+        putResult = cache.replace(new MapCacheRecord(key, valueC2, c2.getRevision()));
+        assertFalse("Replace should fail.", putResult.isSuccessful());
+    }
+
+}