You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/03/30 14:31:25 UTC

nifi git commit: NIFI-3636: Lazily copy FlowFile Attributes Hash Map instead of doing so eagerly.

Repository: nifi
Updated Branches:
  refs/heads/master a7bf683a0 -> 3aa1db6ee


NIFI-3636: Lazily copy FlowFile Attributes Hash Map instead of doing so eagerly.

Signed-off-by: Matt Burgess <ma...@apache.org>

NIFI-3257: Added additional logging regarding timing information when replicating requests across cluster in order to glean insight as to what is taking so long when replicating some requests

Signed-off-by: Matt Burgess <ma...@apache.org>

NIFI-3649: Buffer node responses when replicating HTTP Requests up to a maximum buffer size

Signed-off-by: Matt Burgess <ma...@apache.org>

NIFI-3636: Added unit test to ensure that flowfile attribute maps are copied when appropriate

Signed-off-by: Matt Burgess <ma...@apache.org>

NIFI-3636: Removed patch file that should not have been in commit

Signed-off-by: Matt Burgess <ma...@apache.org>

This closes #1612


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3aa1db6e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3aa1db6e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3aa1db6e

Branch: refs/heads/master
Commit: 3aa1db6ee5d952aa67601e460331940f5e20a166
Parents: a7bf683
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Mar 22 17:21:51 2017 -0400
Committer: Matt Burgess <ma...@apache.org>
Committed: Thu Mar 30 09:53:39 2017 -0400

----------------------------------------------------------------------
 .../StandardAsyncClusterResponse.java           |  98 +++++++++++--
 .../ThreadPoolRequestReplicator.java            | 119 ++++++++++------
 .../nifi/cluster/manager/NodeResponse.java      |  35 +++--
 .../TestThreadPoolRequestReplicator.java        |   6 +-
 .../apache/nifi/cluster/integration/Node.java   |   5 +
 ...g.apache.nifi.components.state.StateProvider |  15 ++
 .../repository/StandardFlowFileRecord.java      |  32 +++--
 .../repository/TestStandardFlowFileRecord.java  |  60 ++++++++
 .../apache/nifi/web/NiFiServiceFacadeLock.java  | 136 +++++++------------
 .../nifi/web/api/ApplicationResource.java       |  19 ++-
 10 files changed, 362 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/3aa1db6e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java
index 318b1a0..ff94eba 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java
@@ -17,24 +17,30 @@
 
 package org.apache.nifi.cluster.coordination.http.replication;
 
-import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
-import org.apache.nifi.cluster.manager.NodeResponse;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.net.URI;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class StandardAsyncClusterResponse implements AsyncClusterResponse {
     private static final Logger logger = LoggerFactory.getLogger(StandardAsyncClusterResponse.class);
+    private static final int DEFAULT_RESPONSE_BUFFER_SIZE = 1024 * 1024;
+
+    public static final String VERIFICATION_PHASE = "Verification Phase";
+    public static final String COMMIT_PHASE = "Execution Phase";
+    public static final String ONLY_PHASE = "Only Phase";
 
     private final String id;
     private final Set<NodeIdentifier> nodeIds;
@@ -45,21 +51,38 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse {
     private final Runnable completedResultFetchedCallback;
     private final long creationTimeNanos;
     private final boolean merge;
+    private final AtomicInteger responseBufferLeft;
 
     private final Map<NodeIdentifier, ResponseHolder> responseMap = new HashMap<>();
     private final AtomicInteger requestsCompleted = new AtomicInteger(0);
 
     private NodeResponse mergedResponse; // guarded by synchronizing on this
     private RuntimeException failure; // guarded by synchronizing on this
+    private volatile String phase;
+    private volatile long phaseStartTime = System.nanoTime();
+    private final long creationTime = System.nanoTime();
+
+    private final Map<String, Long> timingInfo = new LinkedHashMap<>();
+
+    public StandardAsyncClusterResponse(final String id, final URI uri, final String method, final Set<NodeIdentifier> nodeIds, final HttpResponseMapper responseMapper,
+        final CompletionCallback completionCallback, final Runnable completedResultFetchedCallback, final boolean merge) {
+        this(id, uri, method, nodeIds, responseMapper, completionCallback, completedResultFetchedCallback, merge, DEFAULT_RESPONSE_BUFFER_SIZE);
+    }
 
-    public StandardAsyncClusterResponse(final String id, final URI uri, final String method, final Set<NodeIdentifier> nodeIds,
-                                        final HttpResponseMapper responseMapper, final CompletionCallback completionCallback, final Runnable completedResultFetchedCallback, final boolean merge) {
+    public StandardAsyncClusterResponse(final String id, final URI uri, final String method, final Set<NodeIdentifier> nodeIds, final HttpResponseMapper responseMapper,
+        final CompletionCallback completionCallback, final Runnable completedResultFetchedCallback, final boolean merge, final int responseBufferSize) {
         this.id = id;
         this.nodeIds = Collections.unmodifiableSet(new HashSet<>(nodeIds));
         this.uri = uri;
         this.method = method;
         this.merge = merge;
 
+        if ("POST".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(method)) {
+            phase = VERIFICATION_PHASE;
+        } else {
+            phase = ONLY_PHASE;
+        }
+
         creationTimeNanos = System.nanoTime();
         for (final NodeIdentifier nodeId : nodeIds) {
             responseMap.put(nodeId, new ResponseHolder(creationTimeNanos));
@@ -68,8 +91,51 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse {
         this.responseMapper = responseMapper;
         this.completionCallback = completionCallback;
         this.completedResultFetchedCallback = completedResultFetchedCallback;
+        this.responseBufferLeft = new AtomicInteger(responseBufferSize);
     }
 
+    public boolean requestBuffer(final int size) {
+        boolean updated = false;
+        while (!updated) {
+            final int bytesLeft = responseBufferLeft.get();
+            if (bytesLeft < size) {
+                return false;
+            }
+
+            updated = responseBufferLeft.compareAndSet(bytesLeft, bytesLeft - size);
+        }
+
+        return true;
+    }
+
+    public void setPhase(final String phase) {
+        this.phase = phase;
+        phaseStartTime = System.nanoTime();
+    }
+
+    public synchronized void addTiming(final String description, final String node, final long nanos) {
+        final StringBuilder sb = new StringBuilder(description);
+        if (phase != ONLY_PHASE) {
+            sb.append(" (").append(phase).append(")");
+        }
+        sb.append(" for ").append(node);
+        timingInfo.put(sb.toString(), nanos);
+    }
+
+    private synchronized void logTimingInfo() {
+        if (!logger.isDebugEnabled()) {
+            return;
+        }
+
+        final StringBuilder sb = new StringBuilder();
+        sb.append(String.format("For %s %s Timing Info is as follows:\n", method, uri));
+        for (final Map.Entry<String, Long> entry : timingInfo.entrySet()) {
+            sb.append(entry.getKey()).append(" took ").append(TimeUnit.NANOSECONDS.toMillis(entry.getValue())).append(" millis\n");
+        }
+        logger.debug(sb.toString());
+    }
+
+
     @Override
     public String getRequestIdentifier() {
         return id;
@@ -148,7 +214,11 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse {
             .map(p -> p.getResponse())
             .filter(response -> response != null)
             .collect(Collectors.toSet());
+
+        final long start = System.nanoTime();
         mergedResponse = responseMapper.mapResponses(uri, method, nodeResponses, merge);
+        final long nanos = System.nanoTime() - start;
+        addTiming("Map/Merge Responses", "All Nodes", nanos);
 
         logger.debug("Notifying all that merged response is complete for {}", id);
         this.notifyAll();
@@ -169,6 +239,8 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse {
             }
         }
 
+        logTimingInfo();
+
         return getMergedResponse(true);
     }
 
@@ -195,6 +267,8 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse {
             }
         }
 
+        logTimingInfo();
+
         return getMergedResponse(true);
     }
 
@@ -217,10 +291,18 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse {
 
         if (completedCount == responseMap.size()) {
             logger.debug("Notifying all that merged response is ready for {}", id);
+            addTiming("Phase Completed", "All Nodes", System.nanoTime() - phaseStartTime);
+
+            final long start = System.nanoTime();
+
             synchronized (this) {
                 this.notifyAll();
             }
 
+            final long nanos = System.nanoTime() - start;
+            timingInfo.put("Notifying All Threads that Request is Complete", nanos);
+            timingInfo.put("Total Time for All Nodes", System.nanoTime() - creationTime);
+
             if (completionCallback != null) {
                 completionCallback.onCompletion(this);
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/3aa1db6e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
index 3b4470f..bc2f8bb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -250,10 +250,10 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
             // we need to ensure that we use proper locking. We don't want two requests modifying the flow at the same
             // time, so we use a write lock if the request is mutable and a read lock otherwise.
             final Lock lock = isMutableRequest(method, uri.getPath()) ? writeLock : readLock;
-            logger.debug("Obtaining lock {} in order to replicate request {} {}", method, uri);
+            logger.debug("Obtaining lock {} in order to replicate request {} {}", lock, method, uri);
             lock.lock();
             try {
-                logger.debug("Lock {} obtained in order to replicate request {} {}", method, uri);
+                logger.debug("Lock {} obtained in order to replicate request {} {}", lock, method, uri);
 
                 // Unlocking of the lock is performed within the replicate method, as we need to ensure that it is unlocked only after
                 // the entire request has completed.
@@ -316,6 +316,36 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
                 throw new IllegalArgumentException("Cannot replicate request to 0 nodes");
             }
 
+            // Update headers to indicate the current revision so that we can
+            // prevent multiple users changing the flow at the same time
+            final Map<String, String> updatedHeaders = new HashMap<>(headers);
+            final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID_HEADER, key -> UUID.randomUUID().toString());
+
+            // create a response object if one was not already passed to us
+            if (response == null) {
+                // create the request objects and replicate to all nodes.
+                // When the request has completed, we need to ensure that we notify the monitor, if there is one.
+                final CompletionCallback completionCallback = clusterResponse -> {
+                    try {
+                        onCompletedResponse(requestId);
+                    } finally {
+                        if (monitor != null) {
+                            synchronized (monitor) {
+                                monitor.notify();
+                            }
+
+                            logger.debug("Notified monitor {} because request {} {} has completed", monitor, method, uri);
+                        }
+                    }
+                };
+
+                final Runnable responseConsumedCallback = () -> onResponseConsumed(requestId);
+
+                response = new StandardAsyncClusterResponse(requestId, uri, method, nodeIds,
+                    responseMapper, completionCallback, responseConsumedCallback, merge);
+                responseMap.put(requestId, response);
+            }
+
             // verify all of the nodes exist and are in the proper state
             for (final NodeIdentifier nodeId : nodeIds) {
                 final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId);
@@ -330,13 +360,11 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
 
             logger.debug("Replicating request {} {} with entity {} to {}; response is {}", method, uri, entity, nodeIds, response);
 
-            // Update headers to indicate the current revision so that we can
-            // prevent multiple users changing the flow at the same time
-            final Map<String, String> updatedHeaders = new HashMap<>(headers);
-            final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID_HEADER, key -> UUID.randomUUID().toString());
-
             if (performVerification) {
+                final long start = System.nanoTime();
                 verifyClusterState(method, uri.getPath());
+                final long nanos = System.nanoTime() - start;
+                response.addTiming("Verify Cluster State", "All Nodes", nanos);
             }
 
             int numRequests = responseMap.size();
@@ -354,31 +382,6 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
                 throw new IllegalStateException("There are too many outstanding HTTP requests with a total " + numRequests + " outstanding requests");
             }
 
-            // create the request objects and replicate to all nodes.
-            // When the request has completed, we need to ensure that we notify the monitor, if there is one.
-            final CompletionCallback completionCallback = clusterResponse -> {
-                try {
-                    onCompletedResponse(requestId);
-                } finally {
-                    if (monitor != null) {
-                        synchronized (monitor) {
-                            monitor.notify();
-                        }
-
-                        logger.debug("Notified monitor {} because request {} {} has completed", monitor, method, uri);
-                    }
-                }
-            };
-
-            final Runnable responseConsumedCallback = () -> onResponseConsumed(requestId);
-
-            // create a response object if one was not already passed to us
-            if (response == null) {
-                response = new StandardAsyncClusterResponse(requestId, uri, method, nodeIds,
-                    responseMapper, completionCallback, responseConsumedCallback, merge);
-                responseMap.put(requestId, response);
-            }
-
             logger.debug("For Request ID {}, response object is {}", requestId, response);
 
             // if mutable request, we have to do a two-phase commit where we ask each node to verify
@@ -391,6 +394,8 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
                 logger.debug("Performing verification (first phase of two-phase commit) for Request ID {}", requestId);
                 performVerification(nodeIds, method, uri, entity, updatedHeaders, response, merge, monitor);
                 return response;
+            } else if (mutableRequest) {
+                response.setPhase(StandardAsyncClusterResponse.COMMIT_PHASE);
             }
 
             // Callback function for generating a NodeHttpRequestCallable that can be used to perform the work
@@ -407,8 +412,8 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
 
             // replicate the request to all nodes
             final Function<NodeIdentifier, NodeHttpRequest> requestFactory =
-                    nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, updatedHeaders, nodeCompletionCallback);
-            replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, updatedHeaders);
+                nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, updatedHeaders, nodeCompletionCallback, finalResponse);
+            submitAsyncRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, updatedHeaders);
 
             return response;
         } catch (final Throwable t) {
@@ -431,6 +436,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
         final Map<String, String> validationHeaders = new HashMap<>(headers);
         validationHeaders.put(REQUEST_VALIDATION_HTTP_HEADER, NODE_CONTINUE);
 
+        final long startNanos = System.nanoTime();
         final int numNodes = nodeIds.size();
         final NodeRequestCompletionCallback completionCallback = new NodeRequestCompletionCallback() {
             final Set<NodeResponse> nodeResponses = Collections.synchronizedSet(new HashSet<>());
@@ -450,9 +456,14 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
                 }
 
                 try {
+                    final long nanos = System.nanoTime() - startNanos;
+                    clusterResponse.addTiming("Completed Verification", nodeResponse.getNodeId().toString(), nanos);
+
                     // If we have all of the node responses, then we can verify the responses
                     // and if good replicate the original request to all of the nodes.
                     if (allNodesResponded) {
+                        clusterResponse.addTiming("Verification Completed", "All Nodes", nanos);
+
                         // Check if we have any requests that do not have a 150-Continue status code.
                         final long dissentingCount = nodeResponses.stream().filter(p -> p.getStatus() != NODE_CONTINUE_STATUS_CODE).count();
 
@@ -473,9 +484,9 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
                                     logger.debug("Found {} dissenting nodes for {} {}; canceling claim request", dissentingCount, method, uri.getPath());
 
                                     final Function<NodeIdentifier, NodeHttpRequest> requestFactory =
-                                            nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, cancelLockHeaders, null);
+                                        nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, cancelLockHeaders, null, clusterResponse);
 
-                                    replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, cancelLockHeaders);
+                                    submitAsyncRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, cancelLockHeaders);
                                 }
                             });
                             cancelLockThread.setName("Cancel Flow Locks");
@@ -547,10 +558,11 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
         };
 
         // Callback function for generating a NodeHttpRequestCallable that can be used to perform the work
-        final Function<NodeIdentifier, NodeHttpRequest> requestFactory = nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, validationHeaders, completionCallback);
+        final Function<NodeIdentifier, NodeHttpRequest> requestFactory = nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, validationHeaders, completionCallback,
+            clusterResponse);
 
         // replicate the 'verification request' to all nodes
-        replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, validationHeaders);
+        submitAsyncRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, validationHeaders);
     }
 
 
@@ -566,7 +578,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
 
     // Visible for testing - overriding this method makes it easy to verify behavior without actually making any web requests
     protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId,
-                                            final Map<String, String> headers) {
+        final Map<String, String> headers, final StandardAsyncClusterResponse clusterResponse) {
         final ClientResponse clientResponse;
         final long startNanos = System.nanoTime();
         logger.debug("Replicating request to {} {}, request ID = {}, headers = {}", method, uri, requestId, headers);
@@ -594,7 +606,20 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
                 throw new IllegalArgumentException("HTTP Method '" + method + "' not supported for request replication.");
         }
 
-        return new NodeResponse(nodeId, method, uri, clientResponse, System.nanoTime() - startNanos, requestId);
+        final long nanos = System.nanoTime() - startNanos;
+        clusterResponse.addTiming("Perform HTTP Request", nodeId.toString(), nanos);
+        final NodeResponse nodeResponse = new NodeResponse(nodeId, method, uri, clientResponse, System.nanoTime() - startNanos, requestId);
+        if (nodeResponse.is2xx()) {
+            final int length = nodeResponse.getClientResponse().getLength();
+            if (length > 0) {
+                final boolean canBufferResponse = clusterResponse.requestBuffer(length);
+                if (canBufferResponse) {
+                    nodeResponse.bufferResponse();
+                }
+            }
+        }
+
+        return nodeResponse;
     }
 
     private boolean isMutableRequest(final String method, final String uriPath) {
@@ -708,7 +733,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
     }
 
 
-    private void replicateRequest(final Set<NodeIdentifier> nodeIds, final String scheme, final String path,
+    private void submitAsyncRequest(final Set<NodeIdentifier> nodeIds, final String scheme, final String path,
                                   final Function<NodeIdentifier, NodeHttpRequest> callableFactory, final Map<String, String> headers) {
 
         if (nodeIds.isEmpty()) {
@@ -746,20 +771,26 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
         private final Object entity;
         private final Map<String, String> headers = new HashMap<>();
         private final NodeRequestCompletionCallback callback;
+        private final StandardAsyncClusterResponse clusterResponse;
+        private final long creationNanos = System.nanoTime();
 
-        private NodeHttpRequest(final NodeIdentifier nodeId, final String method,
-                                final URI uri, final Object entity, final Map<String, String> headers, final NodeRequestCompletionCallback callback) {
+        private NodeHttpRequest(final NodeIdentifier nodeId, final String method, final URI uri, final Object entity, final Map<String, String> headers,
+            final NodeRequestCompletionCallback callback, final StandardAsyncClusterResponse clusterResponse) {
             this.nodeId = nodeId;
             this.method = method;
             this.uri = uri;
             this.entity = entity;
             this.headers.putAll(headers);
             this.callback = callback;
+            this.clusterResponse = clusterResponse;
         }
 
 
         @Override
         public void run() {
+            final long waitForScheduleNanos = System.nanoTime() - creationNanos;
+            clusterResponse.addTiming("Wait for HTTP Request Replication to be triggered", nodeId.toString(), waitForScheduleNanos);
+
             NodeResponse nodeResponse;
 
             try {
@@ -768,7 +799,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
                 final String requestId = headers.get("x-nifi-request-id");
 
                 logger.debug("Replicating request {} {} to {}", method, uri.getPath(), nodeId);
-                nodeResponse = replicateRequest(resourceBuilder, nodeId, method, uri, requestId, headers);
+                nodeResponse = replicateRequest(resourceBuilder, nodeId, method, uri, requestId, headers, clusterResponse);
             } catch (final Exception e) {
                 nodeResponse = new NodeResponse(nodeId, method, uri, e);
                 logger.warn("Failed to replicate request {} {} to {} due to {}", method, uri.getPath(), nodeId, e);

http://git-wip-us.apache.org/repos/asf/nifi/blob/3aa1db6e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
index 308652e..7c911b8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
@@ -16,8 +16,9 @@
  */
 package org.apache.nifi.cluster.manager;
 
-import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.URI;
 import java.util.List;
@@ -34,6 +35,7 @@ import javax.ws.rs.core.StreamingOutput;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.web.api.entity.Entity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,11 +61,12 @@ public class NodeResponse {
     private final URI requestUri;
     private final ClientResponse clientResponse;
     private final NodeIdentifier nodeId;
-    private final Throwable throwable;
+    private Throwable throwable;
     private boolean hasCreatedResponse = false;
     private final Entity updatedEntity;
     private final long requestDurationNanos;
     private final String requestId;
+    private byte[] bufferedResponse;
 
     public NodeResponse(final NodeIdentifier nodeId, final String httpMethod, final URI requestUri, final ClientResponse clientResponse, final long requestDurationNanos, final String requestId) {
         if (nodeId == null) {
@@ -158,6 +161,23 @@ public class NodeResponse {
         return (500 <= statusCode && statusCode <= 599);
     }
 
+    public synchronized void bufferResponse() {
+        bufferedResponse = new byte[clientResponse.getLength()];
+        try {
+            StreamUtils.fillBuffer(clientResponse.getEntityInputStream(), bufferedResponse);
+        } catch (final IOException e) {
+            this.throwable = e;
+        }
+    }
+
+    private synchronized InputStream getInputStream() {
+        if (bufferedResponse == null) {
+            return clientResponse.getEntityInputStream();
+        }
+
+        return new ByteArrayInputStream(bufferedResponse);
+    }
+
     public ClientResponse getClientResponse() {
         return clientResponse;
     }
@@ -229,7 +249,6 @@ public class NodeResponse {
         for (final String key : clientResponse.getHeaders().keySet()) {
             final List<String> values = clientResponse.getHeaders().get(key);
             for (final String value : values) {
-
                 if (key.equalsIgnoreCase("transfer-encoding") || key.equalsIgnoreCase("content-length")) {
                     /*
                      * do not copy the transfer-encoding header (i.e., chunked encoding) or
@@ -244,25 +263,19 @@ public class NodeResponse {
                      */
                     continue;
                 }
+
                 responseBuilder.header(key, value);
             }
         }
 
         // head requests must not have a message-body in the response
         if (!HttpMethod.HEAD.equalsIgnoreCase(httpMethod)) {
-
             // set the entity
             if (updatedEntity == null) {
                 responseBuilder.entity(new StreamingOutput() {
                     @Override
                     public void write(final OutputStream output) throws IOException, WebApplicationException {
-                        BufferedInputStream bis = null;
-                        try {
-                            bis = new BufferedInputStream(clientResponse.getEntityInputStream());
-                            IOUtils.copy(bis, output);
-                        } finally {
-                            IOUtils.closeQuietly(bis);
-                        }
+                        IOUtils.copy(getInputStream(), output);
                     }
                 });
             } else {

http://git-wip-us.apache.org/repos/asf/nifi/blob/3aa1db6e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
index 88a8836..018bf93 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
@@ -234,7 +234,7 @@ public class TestThreadPoolRequestReplicator {
                 = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) {
             @Override
             protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method,
-                                                    final URI uri, final String requestId, Map<String, String> givenHeaders) {
+                    final URI uri, final String requestId, Map<String, String> givenHeaders, final StandardAsyncClusterResponse response) {
                 // the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them.
                 final OutBoundHeaders headers = (OutBoundHeaders) Whitebox.getInternalState(resourceBuilder, "metadata");
                 final Object expectsHeader = headers.getFirst(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
@@ -364,7 +364,7 @@ public class TestThreadPoolRequestReplicator {
                 = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) {
             @Override
             protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method,
-                                                    final URI uri, final String requestId, Map<String, String> givenHeaders) {
+                    final URI uri, final String requestId, Map<String, String> givenHeaders, final StandardAsyncClusterResponse response) {
                 // the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them.
                 final OutBoundHeaders headers = (OutBoundHeaders) Whitebox.getInternalState(resourceBuilder, "metadata");
                 final Object expectsHeader = headers.getFirst(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
@@ -574,7 +574,7 @@ public class TestThreadPoolRequestReplicator {
         final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, nifiProps) {
             @Override
             protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method,
-                                                    final URI uri, final String requestId, Map<String, String> givenHeaders) {
+                final URI uri, final String requestId, Map<String, String> givenHeaders, final StandardAsyncClusterResponse response) {
                 if (delayMillis > 0L) {
                     try {
                         Thread.sleep(delayMillis);

http://git-wip-us.apache.org/repos/asf/nifi/blob/3aa1db6e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
index 7c74680..20dbfe1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.bundle.Bundle;
 import org.apache.nifi.cluster.ReportedEvent;
 import org.apache.nifi.cluster.coordination.flow.FlowElection;
 import org.apache.nifi.cluster.coordination.heartbeat.ClusterProtocolHeartbeatMonitor;
@@ -62,6 +63,7 @@ import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.io.socket.ServerSocketConfiguration;
 import org.apache.nifi.io.socket.SocketConfiguration;
+import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.registry.VariableRegistry;
 import org.apache.nifi.reporting.BulletinRepository;
 import org.apache.nifi.reporting.Severity;
@@ -118,6 +120,9 @@ public class Node {
             }
         };
 
+        final Bundle systemBundle = ExtensionManager.createSystemBundle(properties);
+        ExtensionManager.discoverExtensions(systemBundle, Collections.emptySet());
+
         revisionManager = Mockito.mock(RevisionManager.class);
         Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/3aa1db6e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/META-INF/services/org.apache.nifi.components.state.StateProvider
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/META-INF/services/org.apache.nifi.components.state.StateProvider b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/META-INF/services/org.apache.nifi.components.state.StateProvider
new file mode 100644
index 0000000..49a02cd
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/META-INF/services/org.apache.nifi.components.state.StateProvider
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.cluster.integration.NopStateProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/3aa1db6e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
index 088f26d..a1d5173 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
@@ -57,7 +57,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
 
     private StandardFlowFileRecord(final Builder builder) {
         this.id = builder.bId;
-        this.attributes = builder.bAttributes;
+        this.attributes = builder.bAttributes == null ? Collections.emptyMap() : builder.bAttributes;
         this.entryDate = builder.bEntryDate;
         this.lineageStartDate = builder.bLineageStartDate;
         this.lineageStartIndex = builder.bLineageStartIndex;
@@ -176,11 +176,12 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
         private final Set<String> bLineageIdentifiers = new HashSet<>();
         private long bPenaltyExpirationMs = -1L;
         private long bSize = 0L;
-        private final Map<String, String> bAttributes = new HashMap<>();
         private ContentClaim bClaim = null;
         private long bClaimOffset = 0L;
         private long bLastQueueDate = System.currentTimeMillis();
         private long bQueueDateIndex = 0L;
+        private Map<String, String> bAttributes;
+        private boolean bAttributesCopied = false;
 
         public Builder id(final long id) {
             bId = id;
@@ -210,14 +211,28 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
             return this;
         }
 
+        private Map<String, String> initializeAttributes() {
+            if (bAttributes == null) {
+                bAttributes = new HashMap<>();
+                bAttributesCopied = true;
+            } else if (!bAttributesCopied) {
+                bAttributes = new HashMap<>(bAttributes);
+                bAttributesCopied = true;
+            }
+
+            return bAttributes;
+        }
+
         public Builder addAttribute(final String key, final String value) {
             if (key != null && value != null) {
-                bAttributes.put(FlowFile.KeyValidator.validateKey(key), value);
+                initializeAttributes().put(FlowFile.KeyValidator.validateKey(key), value);
             }
             return this;
         }
 
         public Builder addAttributes(final Map<String, String> attributes) {
+            final Map<String, String> initializedAttributes = initializeAttributes();
+
             if (null != attributes) {
                 for (final String key : attributes.keySet()) {
                     FlowFile.KeyValidator.validateKey(key);
@@ -226,7 +241,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
                     final String key = entry.getKey();
                     final String value = entry.getValue();
                     if (key != null && value != null) {
-                        bAttributes.put(key, value);
+                        initializedAttributes.put(key, value);
                     }
                 }
             }
@@ -240,7 +255,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
                         continue;
                     }
 
-                    bAttributes.remove(key);
+                    initializeAttributes().remove(key);
                 }
             }
             return this;
@@ -253,7 +268,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
                         continue;
                     }
 
-                    bAttributes.remove(key);
+                    initializeAttributes().remove(key);
                 }
             }
             return this;
@@ -261,7 +276,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
 
         public Builder removeAttributes(final Pattern keyPattern) {
             if (keyPattern != null) {
-                final Iterator<String> iterator = bAttributes.keySet().iterator();
+                final Iterator<String> iterator = initializeAttributes().keySet().iterator();
                 while (iterator.hasNext()) {
                     final String key = iterator.next();
 
@@ -304,7 +319,8 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
             bLineageIdentifiers.clear();
             bPenaltyExpirationMs = specFlowFile.getPenaltyExpirationMillis();
             bSize = specFlowFile.getSize();
-            bAttributes.putAll(specFlowFile.getAttributes());
+            bAttributes = specFlowFile.getAttributes();
+            bAttributesCopied = false;
             bClaim = specFlowFile.getContentClaim();
             bClaimOffset = specFlowFile.getContentClaimOffset();
             bLastQueueDate = specFlowFile.getLastQueueDate();

http://git-wip-us.apache.org/repos/asf/nifi/blob/3aa1db6e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardFlowFileRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardFlowFileRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardFlowFileRecord.java
new file mode 100644
index 0000000..55c0e3e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardFlowFileRecord.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.junit.Test;
+
+public class TestStandardFlowFileRecord {
+
+    @Test
+    public void testAttributeCopiedOnModification() {
+        final FlowFileRecord original = new StandardFlowFileRecord.Builder()
+            .addAttribute("uuid", UUID.randomUUID().toString())
+            .addAttribute("abc", "xyz")
+            .build();
+
+        final FlowFileRecord addAttribute = new StandardFlowFileRecord.Builder()
+            .fromFlowFile(original)
+            .addAttribute("hello", "good-bye")
+            .build();
+
+        final Map<String, String> addAttributeMapCopy = new HashMap<>(addAttribute.getAttributes());
+
+        assertEquals("good-bye", addAttribute.getAttributes().get("hello"));
+        assertEquals("xyz", addAttribute.getAttributes().get("abc"));
+
+        assertEquals("xyz", original.getAttributes().get("abc"));
+        assertFalse(original.getAttributes().containsKey("hello"));
+
+        final FlowFileRecord removeAttribute = new StandardFlowFileRecord.Builder()
+            .fromFlowFile(addAttribute)
+            .removeAttributes("hello")
+            .build();
+
+        assertEquals(original.getAttributes(), removeAttribute.getAttributes());
+        assertEquals(addAttributeMapCopy, addAttribute.getAttributes());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3aa1db6e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacadeLock.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacadeLock.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacadeLock.java
index 7d548c9..11573a5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacadeLock.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacadeLock.java
@@ -16,17 +16,20 @@
  */
 package org.apache.nifi.web;
 
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.aspectj.lang.ProceedingJoinPoint;
 import org.aspectj.lang.annotation.Around;
 import org.aspectj.lang.annotation.Aspect;
-
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Aspect to limit access into the core.
  */
 @Aspect
 public class NiFiServiceFacadeLock {
+    private static final Logger logger = LoggerFactory.getLogger(NiFiServiceFacadeLock.class);
 
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
@@ -36,165 +39,128 @@ public class NiFiServiceFacadeLock {
     @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
             + "execution(* create*(..))")
     public Object createLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
-        writeLock.lock();
-        try {
-            return proceedingJoinPoint.proceed();
-        } finally {
-            writeLock.unlock();
-        }
+        return proceedWithWriteLock(proceedingJoinPoint);
     }
 
     @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
         + "execution(* clear*(..))")
     public Object clearLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
-        writeLock.lock();
-        try {
-            return proceedingJoinPoint.proceed();
-        } finally {
-            writeLock.unlock();
-        }
+        return proceedWithWriteLock(proceedingJoinPoint);
     }
 
     @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
             + "execution(* delete*(..))")
     public Object deleteLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
-        writeLock.lock();
-        try {
-            return proceedingJoinPoint.proceed();
-        } finally {
-            writeLock.unlock();
-        }
+        return proceedWithWriteLock(proceedingJoinPoint);
     }
 
     @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
             + "execution(* remove*(..))")
     public Object removeLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
-        writeLock.lock();
-        try {
-            return proceedingJoinPoint.proceed();
-        } finally {
-            writeLock.unlock();
-        }
+        return proceedWithWriteLock(proceedingJoinPoint);
     }
 
     @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
             + "execution(* update*(..))")
     public Object updateLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
-        writeLock.lock();
-        try {
-            return proceedingJoinPoint.proceed();
-        } finally {
-            writeLock.unlock();
-        }
+        return proceedWithWriteLock(proceedingJoinPoint);
     }
 
     @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
             + "execution(* set*(..))")
     public Object setLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
-        writeLock.lock();
-        try {
-            return proceedingJoinPoint.proceed();
-        } finally {
-            writeLock.unlock();
-        }
+        return proceedWithWriteLock(proceedingJoinPoint);
     }
 
     @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
             + "execution(* copy*(..))")
     public Object copyLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
-        writeLock.lock();
-        try {
-            return proceedingJoinPoint.proceed();
-        } finally {
-            writeLock.unlock();
-        }
+        return proceedWithWriteLock(proceedingJoinPoint);
     }
 
     @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
             + "execution(* import*(..))")
     public Object importLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
-        writeLock.lock();
-        try {
-            return proceedingJoinPoint.proceed();
-        } finally {
-            writeLock.unlock();
-        }
+        return proceedWithWriteLock(proceedingJoinPoint);
     }
 
     @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
             + "execution(* export*(..))")
     public Object exportLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
-        writeLock.lock();
-        try {
-            return proceedingJoinPoint.proceed();
-        } finally {
-            writeLock.unlock();
-        }
+        return proceedWithWriteLock(proceedingJoinPoint);
     }
 
     @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
             + "execution(* submit*(..))")
     public Object submitLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
-        writeLock.lock();
-        try {
-            return proceedingJoinPoint.proceed();
-        } finally {
-            writeLock.unlock();
-        }
+        return proceedWithWriteLock(proceedingJoinPoint);
     }
 
     @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
             + "execution(* schedule*(..))")
     public Object scheduleLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
-        writeLock.lock();
-        try {
-            return proceedingJoinPoint.proceed();
-        } finally {
-            writeLock.unlock();
-        }
+        return proceedWithWriteLock(proceedingJoinPoint);
     }
 
     @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
             + "execution(* get*(..))")
     public Object getLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
-        readLock.lock();
-        try {
-            return proceedingJoinPoint.proceed();
-        } finally {
-            readLock.unlock();
-        }
+        return proceedWithReadLock(proceedingJoinPoint);
     }
 
     @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
             + "execution(* is*(..))")
     public Object isLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
-        readLock.lock();
-        try {
-            return proceedingJoinPoint.proceed();
-        } finally {
-            readLock.unlock();
-        }
+        return proceedWithReadLock(proceedingJoinPoint);
     }
 
     @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
             + "execution(* search*(..))")
     public Object searchLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
+        return proceedWithReadLock(proceedingJoinPoint);
+    }
+
+    @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
+        + "execution(* verify*(..))")
+    public Object verifyLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
+        return proceedWithReadLock(proceedingJoinPoint);
+    }
+
+
+    private Object proceedWithReadLock(final ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
+        final long beforeLock = System.nanoTime();
+        long afterLock = 0L;
+
         readLock.lock();
         try {
+            afterLock = System.nanoTime();
             return proceedingJoinPoint.proceed();
         } finally {
             readLock.unlock();
+
+            final long afterProcedure = System.nanoTime();
+            final String procedure = proceedingJoinPoint.getSignature().toLongString();
+            logger.debug("In order to perform procedure {}, it took {} nanos to obtain the Read Lock {} and {} nanos to invoke the method",
+                procedure, afterLock - beforeLock, readLock, afterProcedure - afterLock);
         }
     }
 
-    @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
-            + "execution(* verify*(..))")
-    public Object verifyLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
-        readLock.lock();
+    private Object proceedWithWriteLock(final ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
+        final long beforeLock = System.nanoTime();
+        long afterLock = 0L;
+
+        writeLock.lock();
         try {
+            afterLock = System.nanoTime();
             return proceedingJoinPoint.proceed();
         } finally {
-            readLock.unlock();
+            writeLock.unlock();
+
+            final long afterProcedure = System.nanoTime();
+            final String procedure = proceedingJoinPoint.getSignature().toLongString();
+            logger.debug("In order to perform procedure {}, it took {} nanos to obtain the Write Lock {} and {} nanos to invoke the method",
+                procedure, afterLock - beforeLock, writeLock, afterProcedure - afterLock);
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/3aa1db6e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
index c159273..455380f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
@@ -978,10 +978,21 @@ public abstract class ApplicationResource {
 
         // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly
         // to the cluster nodes themselves.
-        if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
-            return requestReplicator.replicate(method, path, entity, headers).awaitMergedResponse();
-        } else {
-            return requestReplicator.forwardToCoordinator(getClusterCoordinatorNode(), method, path, entity, headers).awaitMergedResponse();
+        final long replicateStart = System.nanoTime();
+        String action = null;
+        try {
+            if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+                action = "Replicate Request " + method + " " + path;
+                return requestReplicator.replicate(method, path, entity, headers).awaitMergedResponse();
+            } else {
+                action = "Forward Request " + method + " " + path + " to Coordinator";
+                return requestReplicator.forwardToCoordinator(getClusterCoordinatorNode(), method, path, entity, headers).awaitMergedResponse();
+            }
+        } finally {
+            final long replicateNanos = System.nanoTime() - replicateStart;
+            final String transactionId = headers.get(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
+            final String requestId = transactionId == null ? "Request with no ID" : transactionId;
+            logger.debug("Took a total of {} millis to {} for {}", TimeUnit.NANOSECONDS.toMillis(replicateNanos), action, requestId);
         }
     }