You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/07/11 12:13:52 UTC

[1/2] nifi git commit: NIFI-2185: Proxy requests through the cluster coordinator rather than making use of distributed read/write locks. This closes #621

Repository: nifi
Updated Branches:
  refs/heads/master b836db21a -> cf183e15e


http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
index 794c5a1..e340623 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
@@ -16,7 +16,24 @@
  */
 package org.apache.nifi.web;
 
-import com.sun.jersey.core.util.MultivaluedMapImpl;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLEncoder;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.action.Action;
 import org.apache.nifi.action.Component;
@@ -39,6 +56,8 @@ import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
 import org.apache.nifi.cluster.manager.NodeResponse;
 import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
+import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.reporting.ReportingTaskProvider;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
@@ -52,25 +71,11 @@ import org.apache.nifi.web.api.dto.RevisionDTO;
 import org.apache.nifi.web.api.entity.ControllerServiceEntity;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
 import org.apache.nifi.web.api.entity.ReportingTaskEntity;
-import org.apache.nifi.web.concurrent.LockExpiredException;
 import org.apache.nifi.web.util.ClientResponseUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.core.Response;
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URLEncoder;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
 
 /**
  * Implements the NiFiWebConfigurationContext interface to support a context in both standalone and clustered environments.
@@ -274,6 +279,18 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
         return componentFacade.updateComponent(requestContext, annotationData, properties);
     }
 
+
+    private NodeResponse replicate(final String method, final URI uri, final Object entity, final Map<String, String> headers) throws InterruptedException {
+        final NodeIdentifier coordinatorNode = clusterCoordinator.getElectedActiveCoordinatorNode();
+        if (coordinatorNode == null) {
+            throw new NoClusterCoordinatorException();
+        }
+
+        final Set<NodeIdentifier> coordinatorNodes = Collections.singleton(coordinatorNode);
+        return requestReplicator.replicate(coordinatorNodes, method, uri, entity, headers, false).awaitMergedResponse();
+    }
+
+
     /**
      * Facade over accessing different types of NiFi components.
      */
@@ -331,7 +348,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
                 // replicate request
                 NodeResponse nodeResponse;
                 try {
-                    nodeResponse = requestReplicator.replicate(HttpMethod.GET, requestUrl, parameters, getHeaders(requestContext)).awaitMergedResponse();
+                    nodeResponse = replicate(HttpMethod.GET, requestUrl, parameters, getHeaders(requestContext));
                 } catch (final InterruptedException e) {
                     throw new IllegalClusterStateException("Request was interrupted while waiting for response from node");
                 }
@@ -396,7 +413,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
                 // replicate request
                 NodeResponse nodeResponse;
                 try {
-                    nodeResponse = requestReplicator.replicate(HttpMethod.PUT, requestUrl, processorEntity, headers).awaitMergedResponse();
+                    nodeResponse = replicate(HttpMethod.PUT, requestUrl, processorEntity, headers);
                 } catch (final InterruptedException e) {
                     throw new IllegalClusterStateException("Request was interrupted while waiting for response from node");
                 }
@@ -412,20 +429,9 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
                 processor = entity.getComponent();
             } else {
                 // update processor within write lock
-                final String writeLockId = serviceFacade.obtainWriteLock();
-                try {
-                    processor = serviceFacade.withWriteLock(writeLockId, () -> {
-                        ProcessorDTO processorDTO = buildProcessorDto(id, annotationData, properties);
-                        final ProcessorEntity entity = serviceFacade.updateProcessor(revision, processorDTO);
-                        return entity.getComponent();
-                    });
-                } finally {
-                    // ensure the lock is released
-                    try {
-                        serviceFacade.releaseWriteLock(writeLockId);
-                    } catch (final LockExpiredException e) {
-                    }
-                }
+                ProcessorDTO processorDTO = buildProcessorDto(id, annotationData, properties);
+                final ProcessorEntity entity = serviceFacade.updateProcessor(revision, processorDTO);
+                processor = entity.getComponent();
             }
 
             // return the processor info
@@ -530,7 +536,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
                 // replicate request
                 NodeResponse nodeResponse;
                 try {
-                    nodeResponse = requestReplicator.replicate(HttpMethod.GET, requestUrl, parameters, getHeaders(requestContext)).awaitMergedResponse();
+                    nodeResponse = replicate(HttpMethod.GET, requestUrl, parameters, getHeaders(requestContext));
                 } catch (final InterruptedException e) {
                     throw new IllegalClusterStateException("Request was interrupted while waiting for response from node");
                 }
@@ -569,20 +575,9 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
                 controllerServiceDto.setAnnotationData(annotationData);
                 controllerServiceDto.setProperties(properties);
 
-                // update controller service within write lock
-                final String writeLockId = serviceFacade.obtainWriteLock();
-                try {
-                    controllerService = serviceFacade.withWriteLock(writeLockId, () -> {
-                        final ControllerServiceEntity entity = serviceFacade.updateControllerService(revision, controllerServiceDto);
-                        return entity.getComponent();
-                    });
-                } finally {
-                    // ensure the lock is released
-                    try {
-                        serviceFacade.releaseWriteLock(writeLockId);
-                    } catch (final LockExpiredException e) {
-                    }
-                }
+                // update controller service
+                final ControllerServiceEntity entity = serviceFacade.updateControllerService(revision, controllerServiceDto);
+                controllerService = entity.getComponent();
             } else {
                 // if this is a standalone instance the service should have been found above... there should
                 // no cluster to replicate the request to
@@ -628,7 +623,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
                 // replicate request
                 NodeResponse nodeResponse;
                 try {
-                    nodeResponse = requestReplicator.replicate(HttpMethod.PUT, requestUrl, controllerServiceEntity, headers).awaitMergedResponse();
+                    nodeResponse = replicate(HttpMethod.PUT, requestUrl, controllerServiceEntity, headers);
                 } catch (final InterruptedException e) {
                     throw new IllegalClusterStateException("Request was interrupted while waiting for response from node");
                 }
@@ -702,7 +697,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
                 // replicate request
                 NodeResponse nodeResponse;
                 try {
-                    nodeResponse = requestReplicator.replicate(HttpMethod.GET, requestUrl, parameters, getHeaders(requestContext)).awaitMergedResponse();
+                    nodeResponse = replicate(HttpMethod.GET, requestUrl, parameters, getHeaders(requestContext));
                 } catch (final InterruptedException e) {
                     throw new IllegalClusterStateException("Request was interrupted while waiting for response from node");
                 }
@@ -742,20 +737,9 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
                 reportingTaskDto.setProperties(properties);
 
                 // obtain write lock
-                final String writeLockId = serviceFacade.obtainWriteLock();
-                try {
-                    reportingTask = serviceFacade.withWriteLock(writeLockId, () -> {
-                        serviceFacade.verifyRevision(revision, user);
-                        final ReportingTaskEntity entity = serviceFacade.updateReportingTask(revision, reportingTaskDto);
-                        return entity.getComponent();
-                    });
-                } finally {
-                    // ensure the lock is released
-                    try {
-                        serviceFacade.releaseWriteLock(writeLockId);
-                    } catch (final LockExpiredException e) {
-                    }
-                }
+                serviceFacade.verifyRevision(revision, user);
+                final ReportingTaskEntity entity = serviceFacade.updateReportingTask(revision, reportingTaskDto);
+                reportingTask = entity.getComponent();
             } else {
                 // if this is a standalone instance the task should have been found above... there should
                 // no cluster to replicate the request to
@@ -801,7 +785,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
                 // replicate request
                 NodeResponse nodeResponse;
                 try {
-                    nodeResponse = requestReplicator.replicate(HttpMethod.PUT, requestUrl, reportingTaskEntity, headers).awaitMergedResponse();
+                    nodeResponse = replicate(HttpMethod.PUT, requestUrl, reportingTaskEntity, headers);
                 } catch (final InterruptedException e) {
                     throw new IllegalClusterStateException("Request was interrupted while waiting for response from node");
                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
index 6fdca03..42f836c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
@@ -49,6 +49,8 @@ import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.authorization.user.NiFiUserUtils;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
 import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.controller.Snippet;
@@ -61,7 +63,6 @@ import org.apache.nifi.web.api.dto.RevisionDTO;
 import org.apache.nifi.web.api.dto.SnippetDTO;
 import org.apache.nifi.web.api.entity.ComponentEntity;
 import org.apache.nifi.web.api.request.ClientIdParameter;
-import org.apache.nifi.web.concurrent.LockExpiredException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -296,7 +297,9 @@ public abstract class ApplicationResource {
 
         final Map<String, String> result = new HashMap<>();
         final Map<String, String> overriddenHeadersIgnoreCaseMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
-        overriddenHeadersIgnoreCaseMap.putAll(overriddenHeaders);
+        if (overriddenHeaders != null) {
+            overriddenHeadersIgnoreCaseMap.putAll(overriddenHeaders);
+        }
 
         final Enumeration<String> headerNames = httpServletRequest.getHeaderNames();
         while (headerNames.hasMoreElements()) {
@@ -346,10 +349,6 @@ public abstract class ApplicationResource {
         return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER) != null;
     }
 
-    protected boolean isLockCancelationPhase(final HttpServletRequest httpServletRequest) {
-        return httpServletRequest.getHeader(RequestReplicator.LOCK_CANCELATION_HEADER) != null;
-    }
-
     /**
      * Checks whether or not the request should be replicated to the cluster
      *
@@ -482,59 +481,21 @@ public abstract class ApplicationResource {
             final NiFiServiceFacade serviceFacade, final AuthorizeAccess authorizer, final Runnable verifier, final Supplier<Response> action,
         final Runnable verifyRevision) {
 
-        if (isLockCancelationPhase(httpServletRequest)) {
-            final String lockVersionId = httpServletRequest.getHeader(RequestReplicator.LOCK_VERSION_ID_HEADER);
-            try {
-                serviceFacade.releaseWriteLock(lockVersionId);
-            } catch (final Exception e) {
-                // If the lock has expired, then it has already been unlocked.
-            }
-
-            return generateOkResponse().build();
+        final boolean validationPhase = isValidationPhase(httpServletRequest);
+        if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
+            // authorize access
+            serviceFacade.authorizeAccess(authorizer);
+            verifyRevision.run();
         }
 
-        String lockId = null;
-        try {
-            final boolean validationPhase = isValidationPhase(httpServletRequest);
-            if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
-                // authorize access
-                serviceFacade.authorizeAccess(authorizer);
-
-                lockId = httpServletRequest.getHeader(RequestReplicator.LOCK_VERSION_ID_HEADER);
-                lockId = serviceFacade.obtainWriteLock(lockId);
-                verifyRevision.run();
-            } else {
-                lockId = httpServletRequest.getHeader(RequestReplicator.LOCK_VERSION_ID_HEADER);
-            }
-
-            if (validationPhase) {
-                if (verifier != null) {
-                    verifier.run();
-                }
-                return generateContinueResponse().build();
-            }
-
-            try {
-                return serviceFacade.withWriteLock(lockId, () -> action.get());
-            } finally {
-                try {
-                    serviceFacade.releaseWriteLock(lockId);
-                } catch (final LockExpiredException e) {
-                    // If the lock expires here, it's okay. We've already completed our action,
-                    // so the expiration of the lock is of no consequence to us.
-                }
-            }
-        } catch (final RuntimeException t) {
-            if (lockId != null) {
-                try {
-                    serviceFacade.releaseWriteLock(lockId);
-                } catch (final Exception e) {
-                    t.addSuppressed(e);
-                }
+        if (validationPhase) {
+            if (verifier != null) {
+                verifier.run();
             }
-
-            throw t;
+            return generateContinueResponse().build();
         }
+
+        return action.get();
     }
 
     /**
@@ -582,16 +543,57 @@ public abstract class ApplicationResource {
             throw new UnknownNodeException("Cannot replicate request " + method + " " + getAbsolutePath() + " to node with ID " + nodeUuid + " because the specified node does not exist.");
         }
 
-        final Set<NodeIdentifier> targetNodes = Collections.singleton(nodeId);
         final URI path = getAbsolutePath();
         try {
             final Map<String, String> headers = headersToOverride == null ? getHeaders() : getHeaders(headersToOverride);
-            return requestReplicator.replicate(targetNodes, method, path, entity, headers).awaitMergedResponse().getResponse();
+
+            // Determine if we should replicate to the node directly or if we should replicate to the Cluster Coordinator,
+            // and have it replicate the request on our behalf.
+            if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+                // If we are to replicate directly to the nodes, we need to indicate that the replication source is
+                // the cluster coordinator so that the node knows to service the request.
+                final Set<NodeIdentifier> targetNodes = Collections.singleton(nodeId);
+                return requestReplicator.replicate(targetNodes, method, path, entity, headers, true).awaitMergedResponse().getResponse();
+            } else {
+                headers.put(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, nodeId.getId());
+                return requestReplicator.replicate(Collections.singleton(getClusterCoordinatorNode()), method,
+                    path, entity, headers, false).awaitMergedResponse().getResponse();
+            }
         } catch (final InterruptedException ie) {
             return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + path + " was interrupted").type("text/plain").build();
         }
     }
 
+    protected NodeIdentifier getClusterCoordinatorNode() {
+        final NodeIdentifier activeClusterCoordinator = clusterCoordinator.getElectedActiveCoordinatorNode();
+        if (activeClusterCoordinator != null) {
+            return activeClusterCoordinator;
+        }
+
+        throw new NoClusterCoordinatorException();
+    }
+
+    protected ReplicationTarget getReplicationTarget() {
+        return clusterCoordinator.isActiveClusterCoordinator() ? ReplicationTarget.CLUSTER_NODES : ReplicationTarget.CLUSTER_COORDINATOR;
+    }
+
+    protected Response replicate(final String method, final NodeIdentifier targetNode) {
+        try {
+            // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly
+            // to the cluster nodes themselves.
+            if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+                final Set<NodeIdentifier> nodeIds = Collections.singleton(targetNode);
+                return getRequestReplicator().replicate(nodeIds, method, getAbsolutePath(), getRequestParameters(true), getHeaders(), true).awaitMergedResponse().getResponse();
+            } else {
+                final Set<NodeIdentifier> coordinatorNode = Collections.singleton(getClusterCoordinatorNode());
+                final Map<String, String> headers = getHeaders(Collections.singletonMap(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, targetNode.getId()));
+                return getRequestReplicator().replicate(coordinatorNode, method, getAbsolutePath(), getRequestParameters(true), headers, false).awaitMergedResponse().getResponse();
+            }
+        } catch (final InterruptedException ie) {
+            return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + getAbsolutePath() + " was interrupted").type("text/plain").build();
+        }
+    }
+
     /**
      * Convenience method for calling {@link #replicate(String, Object)} with an entity of
      * {@link #getRequestParameters() getRequestParameters(true)}
@@ -604,6 +606,18 @@ public abstract class ApplicationResource {
     }
 
     /**
+     * Convenience method for calling {@link #replicateNodeResponse(String, Object, Map)} with an entity of
+     * {@link #getRequestParameters() getRequestParameters(true)} and overriding no headers
+     *
+     * @param method the HTTP method to use
+     * @return the response from the request
+     * @throws InterruptedException if interrupted while replicating the request
+     */
+    protected NodeResponse replicateNodeResponse(final String method) throws InterruptedException {
+        return replicateNodeResponse(method, getRequestParameters(true), (Map<String, String>) null);
+    }
+
+    /**
      * Replicates the request to all nodes in the cluster using the provided method and entity. The headers
      * used will be those provided by the {@link #getHeaders()} method. The URI that will be used will be
      * that provided by the {@link #getAbsolutePath()} method
@@ -621,18 +635,45 @@ public abstract class ApplicationResource {
      * used will be those provided by the {@link #getHeaders()} method. The URI that will be used will be
      * that provided by the {@link #getAbsolutePath()} method
      *
-     * @param method            the HTTP method to use
-     * @param entity            the entity to replicate
+     * @param method the HTTP method to use
+     * @param entity the entity to replicate
      * @param headersToOverride the headers to override
      * @return the response from the request
+     * @see #replicateNodeResponse(String, Object, Map)
      */
     protected Response replicate(final String method, final Object entity, final Map<String, String> headersToOverride) {
-        final URI path = getAbsolutePath();
         try {
-            final Map<String, String> headers = headersToOverride == null ? getHeaders() : getHeaders(headersToOverride);
-            return requestReplicator.replicate(method, path, entity, headers).awaitMergedResponse().getResponse();
+            return replicateNodeResponse(method, entity, headersToOverride).getResponse();
         } catch (final InterruptedException ie) {
-            return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + path + " was interrupted").type("text/plain").build();
+            return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + getAbsolutePath() + " was interrupted").type("text/plain").build();
+        }
+    }
+
+    /**
+     * Replicates the request to all nodes in the cluster using the provided method and entity. The headers
+     * used will be those provided by the {@link #getHeaders()} method. The URI that will be used will be
+     * that provided by the {@link #getAbsolutePath()} method. This method returns the NodeResponse,
+     * rather than a Response object.
+     *
+     * @param method the HTTP method to use
+     * @param entity the entity to replicate
+     * @param headersToOverride the headers to override
+     *
+     * @return the response from the request
+     *
+     * @throws InterruptedException if interrupted while replicating the request
+     * @see #replicate(String, Object, Map)
+     */
+    protected NodeResponse replicateNodeResponse(final String method, final Object entity, final Map<String, String> headersToOverride) throws InterruptedException {
+        final URI path = getAbsolutePath();
+        final Map<String, String> headers = headersToOverride == null ? getHeaders() : getHeaders(headersToOverride);
+
+        // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly
+        // to the cluster nodes themselves.
+        if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+            return requestReplicator.replicate(method, path, entity, headers).awaitMergedResponse();
+        } else {
+            return requestReplicator.replicate(Collections.singleton(getClusterCoordinatorNode()), method, path, entity, headers, false).awaitMergedResponse();
         }
     }
 
@@ -667,4 +708,8 @@ public abstract class ApplicationResource {
     protected NiFiProperties getProperties() {
         return properties;
     }
+
+    public static enum ReplicationTarget {
+        CLUSTER_NODES, CLUSTER_COORDINATOR;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java
index 7428fb0..fea5d30 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java
@@ -16,13 +16,23 @@
  */
 package org.apache.nifi.web.api;
 
-import com.sun.jersey.api.core.ResourceContext;
-import com.wordnik.swagger.annotations.Api;
-import com.wordnik.swagger.annotations.ApiOperation;
-import com.wordnik.swagger.annotations.ApiParam;
-import com.wordnik.swagger.annotations.ApiResponse;
-import com.wordnik.swagger.annotations.ApiResponses;
-import com.wordnik.swagger.annotations.Authorization;
+import java.util.Collections;
+import java.util.Set;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.authorization.AccessDeniedException;
 import org.apache.nifi.authorization.AuthorizationRequest;
@@ -43,21 +53,13 @@ import org.apache.nifi.web.api.entity.CounterEntity;
 import org.apache.nifi.web.api.entity.CountersEntity;
 import org.apache.nifi.web.api.entity.Entity;
 
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import java.util.Collections;
-import java.util.Set;
+import com.sun.jersey.api.core.ResourceContext;
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
 
 
 /**
@@ -148,7 +150,17 @@ public class CountersResource extends ApplicationResource {
         if (isReplicateRequest()) {
             // determine where this request should be sent
             if (clusterNodeId == null) {
-                final NodeResponse nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).awaitMergedResponse();
+                final NodeResponse nodeResponse;
+
+                // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly
+                // to the cluster nodes themselves.
+                if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+                    nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).awaitMergedResponse();
+                } else {
+                    final Set<NodeIdentifier> coordinatorNode = Collections.singleton(getClusterCoordinatorNode());
+                    nodeResponse = getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), false).awaitMergedResponse();
+                }
+
                 final CountersEntity entity = (CountersEntity) nodeResponse.getUpdatedEntity();
 
                 // ensure there is an updated entity (result of merging) and prune the response as necessary
@@ -164,10 +176,7 @@ public class CountersResource extends ApplicationResource {
                     throw new UnknownNodeException("The specified cluster node does not exist.");
                 }
 
-                final Set<NodeIdentifier> targetNodes = Collections.singleton(targetNode);
-
-                // replicate the request to the specific node
-                return getRequestReplicator().replicate(targetNodes, HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).awaitMergedResponse().getResponse();
+                return replicate(HttpMethod.GET, targetNode);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java
index 2be19e6..18765d0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java
@@ -16,12 +16,29 @@
  */
 package org.apache.nifi.web.api;
 
-import com.wordnik.swagger.annotations.Api;
-import com.wordnik.swagger.annotations.ApiOperation;
-import com.wordnik.swagger.annotations.ApiParam;
-import com.wordnik.swagger.annotations.ApiResponse;
-import com.wordnik.swagger.annotations.ApiResponses;
-import com.wordnik.swagger.annotations.Authorization;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.StreamingOutput;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.authorization.Authorizer;
 import org.apache.nifi.authorization.RequestAction;
@@ -41,29 +58,12 @@ import org.apache.nifi.web.api.entity.FlowFileEntity;
 import org.apache.nifi.web.api.entity.ListingRequestEntity;
 import org.apache.nifi.web.api.request.ClientIdParameter;
 
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
-import javax.ws.rs.core.StreamingOutput;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URI;
-import java.util.Collections;
-import java.util.Set;
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
 
 /**
  * RESTful endpoint for managing a flowfile queue.
@@ -168,10 +168,7 @@ public class FlowFileQueueResource extends ApplicationResource {
                     throw new UnknownNodeException("The specified cluster node does not exist.");
                 }
 
-                final Set<NodeIdentifier> targetNodes = Collections.singleton(targetNode);
-
-                // replicate the request to the specific node
-                return getRequestReplicator().replicate(targetNodes, HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).awaitMergedResponse().getResponse();
+                return replicate(HttpMethod.GET, targetNode);
             }
         }
 
@@ -256,10 +253,7 @@ public class FlowFileQueueResource extends ApplicationResource {
                     throw new UnknownNodeException("The specified cluster node does not exist.");
                 }
 
-                final Set<NodeIdentifier> targetNodes = Collections.singleton(targetNode);
-
-                // replicate the request to the specific node
-                return getRequestReplicator().replicate(targetNodes, HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).awaitMergedResponse().getResponse();
+                return replicate(HttpMethod.GET, targetNode);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
index cc7d5aa..961984b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
@@ -16,13 +16,31 @@
  */
 package org.apache.nifi.web.api;
 
-import com.sun.jersey.api.core.ResourceContext;
-import com.wordnik.swagger.annotations.Api;
-import com.wordnik.swagger.annotations.ApiOperation;
-import com.wordnik.swagger.annotations.ApiParam;
-import com.wordnik.swagger.annotations.ApiResponse;
-import com.wordnik.swagger.annotations.ApiResponses;
-import com.wordnik.swagger.annotations.Authorization;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.authorization.AccessDeniedException;
 import org.apache.nifi.authorization.AuthorizationRequest;
@@ -104,29 +122,13 @@ import org.apache.nifi.web.api.request.DateTimeParameter;
 import org.apache.nifi.web.api.request.IntegerParameter;
 import org.apache.nifi.web.api.request.LongParameter;
 
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
+import com.sun.jersey.api.core.ResourceContext;
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
 
 /**
  * RESTful endpoint for managing a Flow.
@@ -1219,7 +1221,7 @@ public class FlowResource extends ApplicationResource {
         if (isReplicateRequest()) {
             // determine where this request should be sent
             if (clusterNodeId == null) {
-                final NodeResponse nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).awaitMergedResponse();
+                final NodeResponse nodeResponse = replicateNodeResponse(HttpMethod.GET);
                 final ProcessorStatusEntity entity = (ProcessorStatusEntity) nodeResponse.getUpdatedEntity();
 
                 // ensure there is an updated entity (result of merging) and prune the response as necessary
@@ -1301,7 +1303,7 @@ public class FlowResource extends ApplicationResource {
         if (isReplicateRequest()) {
             // determine where this request should be sent
             if (clusterNodeId == null) {
-                final NodeResponse nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).awaitMergedResponse();
+                final NodeResponse nodeResponse = replicateNodeResponse(HttpMethod.GET);
                 final PortStatusEntity entity = (PortStatusEntity) nodeResponse.getUpdatedEntity();
 
                 // ensure there is an updated entity (result of merging) and prune the response as necessary
@@ -1383,7 +1385,7 @@ public class FlowResource extends ApplicationResource {
         if (isReplicateRequest()) {
             // determine where this request should be sent
             if (clusterNodeId == null) {
-                final NodeResponse nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).awaitMergedResponse();
+                final NodeResponse nodeResponse = replicateNodeResponse(HttpMethod.GET);
                 final PortStatusEntity entity = (PortStatusEntity) nodeResponse.getUpdatedEntity();
 
                 // ensure there is an updated entity (result of merging) and prune the response as necessary
@@ -1465,7 +1467,7 @@ public class FlowResource extends ApplicationResource {
         if (isReplicateRequest()) {
             // determine where this request should be sent
             if (clusterNodeId == null) {
-                final NodeResponse nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).awaitMergedResponse();
+                final NodeResponse nodeResponse = replicateNodeResponse(HttpMethod.GET);
                 final RemoteProcessGroupStatusEntity entity = (RemoteProcessGroupStatusEntity) nodeResponse.getUpdatedEntity();
 
                 // ensure there is an updated entity (result of merging) and prune the response as necessary
@@ -1556,7 +1558,7 @@ public class FlowResource extends ApplicationResource {
         if (isReplicateRequest()) {
             // determine where this request should be sent
             if (clusterNodeId == null) {
-                final NodeResponse nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).awaitMergedResponse();
+                final NodeResponse nodeResponse = replicateNodeResponse(HttpMethod.GET);
                 final ProcessGroupStatusEntity entity = (ProcessGroupStatusEntity) nodeResponse.getUpdatedEntity();
 
                 // ensure there is an updated entity (result of merging) and prune the response as necessary
@@ -1659,7 +1661,7 @@ public class FlowResource extends ApplicationResource {
         if (isReplicateRequest()) {
             // determine where this request should be sent
             if (clusterNodeId == null) {
-                final NodeResponse nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).awaitMergedResponse();
+                final NodeResponse nodeResponse = replicateNodeResponse(HttpMethod.GET);
                 final ConnectionStatusEntity entity = (ConnectionStatusEntity) nodeResponse.getUpdatedEntity();
 
                 // ensure there is an updated entity (result of merging) and prune the response as necessary

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index 0a94fea..624937f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -16,19 +16,42 @@
  */
 package org.apache.nifi.web.api;
 
-import com.sun.jersey.api.core.ResourceContext;
-import com.sun.jersey.multipart.FormDataParam;
-import com.wordnik.swagger.annotations.Api;
-import com.wordnik.swagger.annotations.ApiOperation;
-import com.wordnik.swagger.annotations.ApiParam;
-import com.wordnik.swagger.annotations.ApiResponse;
-import com.wordnik.swagger.annotations.ApiResponses;
-import com.wordnik.swagger.annotations.Authorization;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.stream.StreamSource;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.authorization.Authorizer;
 import org.apache.nifi.authorization.RequestAction;
 import org.apache.nifi.authorization.resource.Authorizable;
 import org.apache.nifi.authorization.user.NiFiUserUtils;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.controller.Snippet;
 import org.apache.nifi.web.AuthorizableLookup;
 import org.apache.nifi.web.NiFiServiceFacade;
@@ -65,33 +88,14 @@ import org.apache.nifi.web.api.request.LongParameter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.transform.stream.StreamSource;
-import java.io.InputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
+import com.sun.jersey.api.core.ResourceContext;
+import com.sun.jersey.multipart.FormDataParam;
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
 
 /**
  * RESTful endpoint for managing a Group.
@@ -1981,8 +1985,14 @@ public class ProcessGroupResource extends ApplicationResource {
             final Map<String, String> headersToOverride = new HashMap<>();
             headersToOverride.put("content-type", MediaType.APPLICATION_XML);
 
-            // replicate the request
-            return getRequestReplicator().replicate(HttpMethod.POST, importUri, entity, getHeaders(headersToOverride)).awaitMergedResponse().getResponse();
+            // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly
+            // to the cluster nodes themselves.
+            if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+                return getRequestReplicator().replicate(HttpMethod.POST, importUri, entity, getHeaders(headersToOverride)).awaitMergedResponse().getResponse();
+            } else {
+                final Set<NodeIdentifier> coordinatorNode = Collections.singleton(getClusterCoordinatorNode());
+                return getRequestReplicator().replicate(coordinatorNode, HttpMethod.POST, importUri, entity, getHeaders(headersToOverride), false).awaitMergedResponse().getResponse();
+            }
         }
 
         // otherwise import the template locally

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java
index cbe6cf7..429fb4b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java
@@ -16,6 +16,9 @@
  */
 package org.apache.nifi.web.api;
 
+import java.util.Collections;
+import java.util.Set;
+
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
@@ -37,6 +40,7 @@ import org.apache.nifi.authorization.resource.ResourceFactory;
 import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.authorization.user.NiFiUserUtils;
 import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.web.NiFiServiceFacade;
 import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO;
 import org.apache.nifi.web.api.entity.SystemDiagnosticsEntity;
@@ -125,7 +129,17 @@ public class SystemDiagnosticsResource extends ApplicationResource {
         if (isReplicateRequest()) {
             // determine where this request should be sent
             if (clusterNodeId == null) {
-                final NodeResponse nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).awaitMergedResponse();
+                final NodeResponse nodeResponse;
+
+                // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly
+                // to the cluster nodes themselves.
+                if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+                    nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).awaitMergedResponse();
+                } else {
+                    final Set<NodeIdentifier> coordinatorNode = Collections.singleton(getClusterCoordinatorNode());
+                    nodeResponse = getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), false).awaitMergedResponse();
+                }
+
                 final SystemDiagnosticsEntity entity = (SystemDiagnosticsEntity) nodeResponse.getUpdatedEntity();
 
                 // ensure there is an updated entity (result of merging) and prune the response as necessary

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/LockExpiredExceptionMapper.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/LockExpiredExceptionMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/LockExpiredExceptionMapper.java
deleted file mode 100644
index 56c87e9..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/LockExpiredExceptionMapper.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.web.api.config;
-
-import javax.ws.rs.core.Response;
-import javax.ws.rs.ext.ExceptionMapper;
-import javax.ws.rs.ext.Provider;
-
-import org.apache.nifi.util.StringUtils;
-import org.apache.nifi.web.concurrent.LockExpiredException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Provider
-public class LockExpiredExceptionMapper implements ExceptionMapper<LockExpiredException> {
-    private static final Logger logger = LoggerFactory.getLogger(InvalidRevisionExceptionMapper.class);
-
-    @Override
-    public Response toResponse(LockExpiredException exception) {
-        // log the error
-        logger.warn(String.format("%s. Returning %s response.", exception, Response.Status.CONFLICT));
-
-        if (logger.isDebugEnabled()) {
-            logger.debug(StringUtils.EMPTY, exception);
-        }
-
-        return Response.status(Response.Status.CONFLICT).entity(exception.getMessage()).type("text/plain").build();
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/NoClusterCoordinatorExceptionMapper.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/NoClusterCoordinatorExceptionMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/NoClusterCoordinatorExceptionMapper.java
new file mode 100644
index 0000000..4b15a70
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/NoClusterCoordinatorExceptionMapper.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.api.config;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+
+import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
+import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
+import org.apache.nifi.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NoClusterCoordinatorExceptionMapper implements ExceptionMapper<NoClusterCoordinatorException> {
+    private static final Logger logger = LoggerFactory.getLogger(NoConnectedNodesException.class);
+
+    @Override
+    public Response toResponse(final NoClusterCoordinatorException ex) {
+        // log the error
+        logger.info(String.format("Cluster failed processing request: %s. Returning %s response.", ex, Response.Status.SERVICE_UNAVAILABLE));
+
+        if (logger.isDebugEnabled()) {
+            logger.debug(StringUtils.EMPTY, ex);
+        }
+
+        return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity(ex.getMessage()).type("text/plain").build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
index 15d2f0f..2e42efb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
@@ -163,13 +163,8 @@
         <property name="clusterCoordinator" ref="clusterCoordinator"/>
         <property name="heartbeatMonitor" ref="heartbeatMonitor" />
         <property name="bulletinRepository" ref="bulletinRepository"/>
-        <property name="lockManager" ref="lockManager" />
     </bean>
     
-    <bean id="lockManager" class="org.apache.nifi.web.concurrent.DistributedReadWriteLock">
-        <constructor-arg ref="nifiProperties" />
-    </bean>
-
     <!-- component ui extension configuration context -->
     <bean id="nifiWebConfigurationContext" class="org.apache.nifi.web.StandardNiFiWebConfigurationContext">
         <property name="serviceFacade" ref="serviceFacade"/>
@@ -397,12 +392,12 @@
     <bean class="org.apache.nifi.web.api.config.IllegalNodeReconnectionExceptionMapper" scope="singleton"/>
     <bean class="org.apache.nifi.web.api.config.IllegalStateExceptionMapper" scope="singleton"/>
     <bean class="org.apache.nifi.web.api.config.InvalidRevisionExceptionMapper" scope="singleton"/>
-    <bean class="org.apache.nifi.web.api.config.LockExpiredExceptionMapper" scope="singleton"/>
     <bean class="org.apache.nifi.web.api.config.JsonMappingExceptionMapper" scope="singleton"/>
     <bean class="org.apache.nifi.web.api.config.JsonParseExceptionMapper" scope="singleton"/>
     <bean class="org.apache.nifi.web.api.config.MutableRequestExceptionMapper" scope="singleton"/>
     <bean class="org.apache.nifi.web.api.config.NiFiCoreExceptionMapper" scope="singleton"/>
     <bean class="org.apache.nifi.web.api.config.NoConnectedNodesExceptionMapper" scope="singleton"/>
+    <bean class="org.apache.nifi.web.api.config.NoClusterCoordinatorExceptionMapper" scope="singleton"/>
     <bean class="org.apache.nifi.web.api.config.NoResponseFromNodesExceptionMapper" scope="singleton"/>
     <bean class="org.apache.nifi.web.api.config.NodeDisconnectionExceptionMapper" scope="singleton"/>
     <bean class="org.apache.nifi.web.api.config.NodeReconnectionExceptionMapper" scope="singleton"/>

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/DistributedReadWriteLock.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/DistributedReadWriteLock.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/DistributedReadWriteLock.java
deleted file mode 100644
index 608d0c0..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/DistributedReadWriteLock.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.web.concurrent;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.NiFiProperties;
-
-public class DistributedReadWriteLock implements DistributedLockingManager {
-    private final DistributedLock readLock;
-    private final DistributedLock writeLock;
-
-    public DistributedReadWriteLock(final NiFiProperties properties) {
-        this(FormatUtils.getTimeDuration(properties.getProperty(NiFiProperties.REQUEST_REPLICATION_CLAIM_TIMEOUT,
-            NiFiProperties.DEFAULT_REQUEST_REPLICATION_CLAIM_TIMEOUT), TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
-    }
-
-    public DistributedReadWriteLock(final long lockExpirationPeriod, final TimeUnit lockExpirationUnit) {
-        final ReadWriteLockSync sync = new ReadWriteLockSync();
-        readLock = new ReentrantDistributedLock(LockMode.SHARED, sync, lockExpirationPeriod, lockExpirationUnit);
-        writeLock = new ReentrantDistributedLock(LockMode.MUTUALLY_EXCLUSIVE, sync, lockExpirationPeriod, lockExpirationUnit);
-    }
-
-    @Override
-    public DistributedLock getReadLock() {
-        return readLock;
-    }
-
-    @Override
-    public DistributedLock getWriteLock() {
-        return writeLock;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockInfo.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockInfo.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockInfo.java
deleted file mode 100644
index a34bb5b..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockInfo.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.web.concurrent;
-
-import java.util.concurrent.TimeUnit;
-
-public class LockInfo {
-    private final String versionId;
-    private final int lockCount;
-    private final LockMode lockMode;
-    private final long expirationTime;
-
-    public LockInfo(final String versionId, final LockMode lockMode, final int lockCount, final long expirationPeriod, final TimeUnit expirationUnit) {
-        this.versionId = versionId;
-        this.lockMode = lockMode;
-        this.lockCount = lockCount;
-        this.expirationTime = System.nanoTime() + expirationUnit.toNanos(expirationPeriod);
-    }
-
-    public boolean isExpired() {
-        return System.nanoTime() > expirationTime;
-    }
-
-    public String getVersionId() {
-        return versionId;
-    }
-
-    public int getLockCount() {
-        return lockCount;
-    }
-
-    public LockMode getLockMode() {
-        return lockMode;
-    }
-
-    @Override
-    public String toString() {
-        return "LockInfo[versionId=" + versionId + ", lockMode=" + lockMode + ", lockCount = " + lockCount + ", expired=" + isExpired() + "]";
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockMode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockMode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockMode.java
deleted file mode 100644
index 2cc5eac..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockMode.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.web.concurrent;
-
-public enum LockMode {
-
-    SHARED,
-
-    MUTUALLY_EXCLUSIVE;
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReadWriteLockSync.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReadWriteLockSync.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReadWriteLockSync.java
deleted file mode 100644
index ef32f8f..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReadWriteLockSync.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.web.concurrent;
-
-import java.util.concurrent.atomic.AtomicReference;
-
-public class ReadWriteLockSync {
-    private final AtomicReference<LockInfo> lockInfoRef = new AtomicReference<>();
-
-    public LockInfo get() {
-        return lockInfoRef.get();
-    }
-
-    public boolean update(final LockInfo currentLock, final LockInfo updatedLock) {
-        return lockInfoRef.compareAndSet(currentLock, updatedLock);
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReentrantDistributedLock.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReentrantDistributedLock.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReentrantDistributedLock.java
deleted file mode 100644
index c51feec..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReentrantDistributedLock.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.web.concurrent;
-
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ReentrantDistributedLock implements DistributedLock {
-    private static final Logger logger = LoggerFactory.getLogger(ReentrantDistributedLock.class);
-
-    private final long expirationNanos;
-
-    private final ReadWriteLockSync sync;
-    private final LockMode lockMode;
-
-    public ReentrantDistributedLock(final LockMode lockMode, final ReadWriteLockSync sync, final long expirationTimePeriod, final TimeUnit expirationTimeUnit) {
-        this.lockMode = lockMode;
-        this.sync = sync;
-        this.expirationNanos = expirationTimeUnit.toNanos(expirationTimePeriod);
-    }
-
-    int getClaimCount() {
-        final LockInfo currentInfo = sync.get();
-        if (currentInfo == null || currentInfo.isExpired()) {
-            return 0;
-        }
-
-        return currentInfo.getLockCount();
-    }
-
-    @Override
-    public String lock() {
-        return lock(null);
-    }
-
-    @Override
-    public String lock(final String versionIdentifier) {
-        return tryLock(-1L, TimeUnit.MILLISECONDS, versionIdentifier);
-    }
-
-    @Override
-    public String tryLock(final long time, final TimeUnit timeUnit) {
-        return tryLock(time, timeUnit, null);
-    }
-
-    @Override
-    public String tryLock(final long timePeriod, final TimeUnit timeUnit, final String versionIdentifier) {
-        final long stopTryingTime = timePeriod < 0 ? -1L : System.nanoTime() + timeUnit.toNanos(timePeriod);
-        logger.debug("Attempting to obtain {} lock with a max wait of {} {}", lockMode, timePeriod, timeUnit);
-
-        long i = 0;
-        while (true) {
-            if (i++ > 0) {
-                if (stopTryingTime > 0L && System.nanoTime() > stopTryingTime) {
-                    logger.debug("Failed to obtain {} lock within {} {}; returning null for tryLock", lockMode, timePeriod, timeUnit);
-                    return null;
-                }
-
-                // If not the first time we've reached this point, we want to
-                // give other threads a chance to release their locks before
-                // we enter the synchronized block.
-                Thread.yield();
-            }
-
-            synchronized (sync) {
-                final LockInfo currentInfo = sync.get();
-                logger.trace("Current Lock Info = {}", currentInfo);
-
-                if (currentInfo == null || currentInfo.isExpired()) {
-                    // There is no lock currently held. Attempt to obtain the lock.
-                    final String versionId = versionIdentifier == null ? UUID.randomUUID().toString() : versionIdentifier;
-                    final boolean updated = updateLockInfo(currentInfo, versionId, 1);
-
-                    if (updated) {
-                        // Lock has been obtained. Return the current version.
-                        logger.debug("Obtained {} lock with Version ID {}", lockMode, versionId);
-                        return versionId;
-                    } else {
-                        // Try again.
-                        logger.debug("Failed to update atomic reference. Trying again");
-                        continue;
-                    }
-                } else {
-                    // There is already a lock held. If the lock that is being held is SHARED,
-                    // and this is a SHARED lock, then we can use it.
-                    if (lockMode == LockMode.SHARED && currentInfo.getLockMode() == LockMode.SHARED) {
-                        logger.debug("Lock is already held but is a shared lock. Attempting to increment lock count");
-
-                        // lock being held is a shared lock, and this is a shared lock. We can just
-                        // update the Lock Info by incrementing the lock count and using a new expiration time.
-                        final boolean updated = updateLockInfo(currentInfo, currentInfo.getVersionId(), currentInfo.getLockCount() + 1);
-                        if (updated) {
-                            // lock info was updated. Return the current version.
-                            logger.debug("Incremented lock count. Obtained {} lock with Version ID {}", lockMode, currentInfo.getVersionId());
-                            return currentInfo.getVersionId();
-                        } else {
-                            // failed to update the lock info. The lock has expired, so we have to start over.
-                            logger.debug("Failed to update atomic reference. Trying again");
-                            continue;
-                        }
-                    } else {
-                        // either the lock being held is a mutex or this lock requires a mutex. Either
-                        // way, we cannot enter the lock, so we will wait a bit and then retry.
-                        // We wait before entering synchronized block, because we don't want to overuse
-                        // the CPU and we want to give other threads a chance to unlock the lock.
-                        logger.debug("Cannot obtain {} lock because it is already held and cannot be shared. Trying again", lockMode);
-                        continue;
-                    }
-                }
-            }
-        }
-    }
-
-    protected boolean updateLockInfo(final LockInfo currentInfo, final String versionId, final int lockCount) {
-        final LockInfo newInfo = new LockInfo(versionId, lockMode, lockCount, expirationNanos, TimeUnit.NANOSECONDS);
-        return sync.update(currentInfo, newInfo);
-    }
-
-    @Override
-    public <T> T withLock(final String identifier, final Supplier<T> action) throws LockExpiredException {
-        synchronized (sync) {
-            verifyIdentifier(identifier, sync.get());
-            return action.get();
-        }
-    }
-
-    @Override
-    public void unlock(final String identifier) throws LockExpiredException {
-        synchronized (sync) {
-            final LockInfo info = sync.get();
-            verifyIdentifier(identifier, info);
-
-            final int newLockCount = info.getLockCount() - 1;
-            if (newLockCount <= 0) {
-                sync.update(info, null);
-            } else {
-                sync.update(info, new LockInfo(info.getVersionId(), lockMode, newLockCount, expirationNanos, TimeUnit.NANOSECONDS));
-            }
-        }
-    }
-
-    private void verifyIdentifier(final String identifier, final LockInfo lockInfo) throws LockExpiredException {
-        if (lockInfo == null) {
-            throw new LockExpiredException("No lock has been obtained");
-        }
-
-        if (!lockInfo.getVersionId().equals(identifier)) {
-            throw new LockExpiredException("Incorrect Lock ID provided. This typically means that the lock has already expired and another lock has been obtained.");
-        }
-
-        if (lockInfo.isExpired()) {
-            throw new LockExpiredException("Lock has already expired");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/concurrent/TestReentrantDistributedLock.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/concurrent/TestReentrantDistributedLock.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/concurrent/TestReentrantDistributedLock.java
deleted file mode 100644
index 8027614..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/concurrent/TestReentrantDistributedLock.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.web.concurrent;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReferenceArray;
-
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestReentrantDistributedLock {
-    private ReadWriteLockSync sync;
-
-    @Before
-    public void setup() {
-        sync = new ReadWriteLockSync();
-    }
-
-    @Test(timeout = 5000)
-    public void testMultipleReadLocks() throws LockExpiredException {
-        final ReentrantDistributedLock lock = createReadLock();
-        final String id1 = lock.lock();
-        final String id2 = lock.lock();
-        assertEquals(id1, id2);
-
-        assertEquals(2, lock.getClaimCount());
-        lock.unlock(id1);
-        assertEquals(1, lock.getClaimCount());
-        lock.unlock(id2);
-        assertEquals(0, lock.getClaimCount());
-    }
-
-    @Test(timeout = 10000)
-    public void testMultipleWriteLocksBlock() throws LockExpiredException {
-        final ReentrantDistributedLock lock = createWriteLock();
-        final String id1 = lock.lock();
-        assertNotNull(id1);
-
-        final long startTime = System.nanoTime();
-        final String id2 = lock.tryLock(500, TimeUnit.MILLISECONDS);
-        assertNull(id2);
-
-        // We don't know exactly how long it will take to timeout because the time periods
-        // won't be exact, but it should take more than 350 milliseconds.
-        final long nanos = System.nanoTime() - startTime;
-        assertTrue(nanos > TimeUnit.MILLISECONDS.toNanos(350L));
-
-        lock.unlock(id1);
-        final String id3 = lock.tryLock(500, TimeUnit.MILLISECONDS);
-        assertNotNull(id3);
-        assertNotSame(id1, id3);
-        lock.unlock(id3);
-    }
-
-    @Test(timeout = 10000)
-    public void testReadLockBlocksWriteLock() throws LockExpiredException {
-        final ReentrantDistributedLock readLock = createReadLock();
-        final ReentrantDistributedLock writeLock = createWriteLock();
-
-        final String id1 = readLock.lock();
-        assertNotNull(id1);
-
-        final long startTime = System.nanoTime();
-        final String id2 = writeLock.tryLock(500, TimeUnit.MILLISECONDS);
-        assertNull(id2);
-
-        // We don't know exactly how long it will take to timeout because the time periods
-        // won't be exact, but it should take more than 350 milliseconds.
-        final long nanos = System.nanoTime() - startTime;
-        assertTrue(nanos > TimeUnit.MILLISECONDS.toNanos(350L));
-
-        readLock.unlock(id1);
-
-        final String id3 = writeLock.lock();
-        assertNotNull(id3);
-        assertNotSame(id1, id3);
-
-        writeLock.unlock(id3);
-    }
-
-    @Test(timeout = 10000)
-    public void testWriteLockBlocksReadLock() throws LockExpiredException {
-        final ReentrantDistributedLock readLock = createReadLock();
-        final ReentrantDistributedLock writeLock = createWriteLock();
-
-        final String id1 = writeLock.lock();
-        assertNotNull(id1);
-
-        final long startTime = System.nanoTime();
-        final String id2 = readLock.tryLock(500, TimeUnit.MILLISECONDS);
-        assertNull(id2);
-
-        // We don't know exactly how long it will take to timeout because the time periods
-        // won't be exact, but it should take more than 350 milliseconds.
-        final long nanos = System.nanoTime() - startTime;
-        assertTrue(nanos > TimeUnit.MILLISECONDS.toNanos(350L));
-
-        writeLock.unlock(id1);
-
-        final String id3 = readLock.lock();
-        assertNotNull(id3);
-        assertNotSame(id1, id3);
-
-        readLock.unlock(id3);
-    }
-
-    @Test(timeout = 10000)
-    public void testMultipleReadLocksBlockingOnWriteLock() throws InterruptedException, LockExpiredException {
-        final ReentrantDistributedLock readLock = createReadLock();
-        final ReentrantDistributedLock writeLock = createWriteLock();
-
-        final String id1 = writeLock.lock();
-        assertNotNull(id1);
-
-        final ExecutorService executor = Executors.newFixedThreadPool(3);
-        final AtomicReferenceArray<String> array = new AtomicReferenceArray<>(3);
-        for (int i = 0; i < 3; i++) {
-            final int index = i;
-            executor.submit(new Runnable() {
-                @Override
-                public void run() {
-                    final String id = readLock.lock();
-                    assertNotNull(id);
-                    array.set(index, id);
-                }
-            });
-        }
-
-        // wait a bit and then make sure that no values have been set
-        Thread.sleep(250L);
-        for (int i = 0; i < 3; i++) {
-            assertNull(array.get(i));
-        }
-
-        // unlock so that the readers can lock.
-        writeLock.unlock(id1);
-
-        executor.shutdown();
-        executor.awaitTermination(10, TimeUnit.MINUTES);
-
-        final String id = array.get(0);
-        assertNotNull(id);
-        for (int i = 0; i < 3; i++) {
-            assertEquals(id, array.get(i));
-        }
-
-        for (int i = 0; i < 3; i++) {
-            assertEquals(3 - i, readLock.getClaimCount());
-            readLock.unlock(id);
-        }
-
-        assertEquals(0, readLock.getClaimCount());
-    }
-
-    @Test(timeout = 10000)
-    public void testLockExpires() {
-        final ReentrantDistributedLock lock = new ReentrantDistributedLock(LockMode.MUTUALLY_EXCLUSIVE, sync, 25, TimeUnit.MILLISECONDS);
-        final String id1 = lock.lock();
-        assertNotNull(id1);
-
-        final long start = System.nanoTime();
-        final String id2 = lock.lock();
-        final long nanos = System.nanoTime() - start;
-
-        assertNotNull(id2);
-        assertNotSame(id1, id2);
-
-        // The timeout may not entirely elapse but will be close. Give 5 milliseconds buffer
-        assertTrue(nanos > TimeUnit.MILLISECONDS.toNanos(20));
-    }
-
-    @Test(timeout = 10000)
-    public void testWithLock() throws LockExpiredException, Exception {
-        final ReentrantDistributedLock lock = createWriteLock();
-        final String id = lock.lock();
-        assertEquals(1, lock.getClaimCount());
-
-        final Object obj = new Object();
-        final Object returned = lock.withLock(id, () -> obj);
-        assertTrue(returned == obj);
-        assertEquals(1, lock.getClaimCount());
-        lock.unlock(id);
-        assertEquals(0, lock.getClaimCount());
-    }
-
-    private ReentrantDistributedLock createReadLock() {
-        return new ReentrantDistributedLock(LockMode.SHARED, sync, 30, TimeUnit.SECONDS);
-    }
-
-    private ReentrantDistributedLock createWriteLock() {
-        return new ReentrantDistributedLock(LockMode.MUTUALLY_EXCLUSIVE, sync, 30, TimeUnit.SECONDS);
-    }
-}


[2/2] nifi git commit: NIFI-2185: Proxy requests through the cluster coordinator rather than making use of distributed read/write locks. This closes #621

Posted by mc...@apache.org.
NIFI-2185: Proxy requests through the cluster coordinator rather than making use of distributed read/write locks. This closes #621


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/cf183e15
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/cf183e15
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/cf183e15

Branch: refs/heads/master
Commit: cf183e15e3d400a138791ee5ca79287e87ebd932
Parents: b836db2
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jul 8 12:58:06 2016 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Mon Jul 11 08:12:44 2016 -0400

----------------------------------------------------------------------
 .../coordination/ClusterCoordinator.java        |  25 +++
 .../coordination/node/NodeConnectionStatus.java |   8 +-
 .../http/replication/RequestReplicator.java     |  26 ++-
 .../ThreadPoolRequestReplicator.java            |  35 ++-
 .../node/NodeClusterCoordinator.java            | 147 ++++++++++---
 .../NoClusterCoordinatorException.java          |  31 +++
 .../heartbeat/TestAbstractHeartbeatMonitor.java |  28 ++-
 .../TestThreadPoolRequestReplicator.java        |  17 +-
 .../node/TestNodeClusterCoordinator.java        |  36 ++--
 .../nifi/web/concurrent/DistributedLock.java    | 111 ----------
 .../concurrent/DistributedLockingManager.java   |  71 ------
 .../web/concurrent/LockExpiredException.java    |  26 ---
 .../nifi/web/revision/RevisionManager.java      |   7 -
 .../apache/nifi/controller/FlowController.java  |  24 ++-
 .../nifi/controller/StandardFlowService.java    |   5 +-
 .../nifi/spring/FlowControllerFactoryBean.java  |   3 +
 .../org/apache/nifi/web/NiFiServiceFacade.java  | 100 ---------
 .../nifi/web/StandardNiFiContentAccess.java     |  12 +-
 .../nifi/web/StandardNiFiServiceFacade.java     |  49 -----
 .../StandardNiFiWebConfigurationContext.java    | 112 +++++-----
 .../nifi/web/api/ApplicationResource.java       | 171 +++++++++------
 .../apache/nifi/web/api/CountersResource.java   |  63 +++---
 .../nifi/web/api/FlowFileQueueResource.java     |  68 +++---
 .../org/apache/nifi/web/api/FlowResource.java   |  74 +++----
 .../nifi/web/api/ProcessGroupResource.java      |  84 ++++----
 .../nifi/web/api/SystemDiagnosticsResource.java |  16 +-
 .../api/config/LockExpiredExceptionMapper.java  |  44 ----
 .../NoClusterCoordinatorExceptionMapper.java    |  44 ++++
 .../src/main/resources/nifi-web-api-context.xml |   7 +-
 .../concurrent/DistributedReadWriteLock.java    |  49 -----
 .../apache/nifi/web/concurrent/LockInfo.java    |  55 -----
 .../apache/nifi/web/concurrent/LockMode.java    |  26 ---
 .../nifi/web/concurrent/ReadWriteLockSync.java  |  32 ---
 .../concurrent/ReentrantDistributedLock.java    | 174 ---------------
 .../TestReentrantDistributedLock.java           | 216 -------------------
 35 files changed, 685 insertions(+), 1311 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
index 7cd9f80..a2e17b5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
@@ -167,6 +167,17 @@ public interface ClusterCoordinator {
     NodeIdentifier getPrimaryNode();
 
     /**
+     * @return the identifier of the node that is elected the active cluster coordinator, or <code>null</code> if
+     *         there is no active cluster coordinator elected.
+     */
+    NodeIdentifier getElectedActiveCoordinatorNode();
+
+    /**
+     * @return <code>true</code> if this node has been elected the active cluster coordinator, <code>false</code> otherwise.
+     */
+    boolean isActiveClusterCoordinator();
+
+    /**
      * Updates the Flow Service to use for obtaining the current flow
      *
      * @param flowService the flow service to use for obtaining the current flow
@@ -200,4 +211,18 @@ public interface ClusterCoordinator {
      * @return <code>true</code> if connected, <code>false</code> otherwise
      */
     boolean isConnected();
+
+    /**
+     * Notifies the cluster coordinator that this node has been granted the given role
+     *
+     * @param clusterRole the role that this node has been granted
+     */
+    void addRole(String clusterRole);
+
+    /**
+     * Notifies the cluster coordinator that this node is no longer responsible for the given role
+     *
+     * @param clusterRole the role that this node is no longer responsible for
+     */
+    void removeRole(String clusterRole);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java
index 4570a24..23e509d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java
@@ -43,18 +43,11 @@ public class NodeConnectionStatus {
     private final Long connectionRequestTime;
     private final Set<String> roles;
 
-    public NodeConnectionStatus(final NodeIdentifier nodeId, final NodeConnectionState state) {
-        this(nodeId, state, null, null, null, null);
-    }
 
     public NodeConnectionStatus(final NodeIdentifier nodeId, final NodeConnectionState state, final Set<String> roles) {
         this(nodeId, state, null, null, null, roles);
     }
 
-    public NodeConnectionStatus(final NodeIdentifier nodeId, final NodeConnectionState state, final long connectionRequestTime) {
-        this(nodeId, state, null, null, connectionRequestTime, null);
-    }
-
     public NodeConnectionStatus(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode) {
         this(nodeId, NodeConnectionState.DISCONNECTED, disconnectionCode, disconnectionCode.name(), null, null);
     }
@@ -129,6 +122,7 @@ public class NodeConnectionStatus {
         if (state == NodeConnectionState.DISCONNECTED || state == NodeConnectionState.DISCONNECTING) {
             sb.append(", Disconnect Code=").append(getDisconnectCode()).append(", Disconnect Reason=").append(getDisconnectReason());
         }
+        sb.append(", roles=").append(getRoles());
         sb.append(", updateId=").append(getUpdateIdentifier());
         sb.append("]");
         return sb.toString();

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
index fad7454..6724901 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
@@ -27,7 +27,6 @@ public interface RequestReplicator {
 
     public static final String REQUEST_TRANSACTION_ID_HEADER = "X-RequestTransactionId";
     public static final String CLUSTER_ID_GENERATION_SEED_HEADER = "X-Cluster-Id-Generation-Seed";
-    public static final String REPLICATION_INDICATOR_HEADER = "X-Request-Replicated";
 
     /**
      * The HTTP header that the requestor specifies to ask a node if they are able to process a given request. The value
@@ -45,6 +44,25 @@ public interface RequestReplicator {
     public static final String LOCK_VERSION_ID_HEADER = "X-Lock-Version-Id";
 
     /**
+     * When we replicate a request across the cluster, we replicate it only from the cluster coordinator.
+     * If the request needs to be replicated by another node, it first replicates the request to the coordinator,
+     * which then replicates the request on the node's behalf. This header name and value are used to denote
+     * that the request has already been to the cluster coordinator, and the cluster coordinator is the one replicating
+     * the request. This allows us to know that the request should be serviced, rather than proxied back to the
+     * cluster coordinator.
+     */
+    public static final String REPLICATION_INDICATOR_HEADER = "X-Request-Replicated";
+
+    /**
+     * When replicating a request to the cluster coordinator, it may be useful to denote that the request should
+     * be replicated only to a single node. This happens, for instance, when retrieving a Provenance Event that
+     * we know lives on a specific node. This request must still be replicated through the cluster coordinator.
+     * This header tells the cluster coordinator the UUID's (comma-separated list, possibly with spaces between)
+     * of the nodes that the request should be replicated to.
+     */
+    public static final String REPLICATION_TARGET_NODE_UUID_HEADER = "X-Replication-Target-Id";
+
+    /**
      * Stops the instance from replicating requests. Calling this method on a stopped instance has no effect.
      */
     void shutdown();
@@ -54,7 +72,7 @@ public interface RequestReplicator {
      * Replicates a request to each node in the cluster. If the request attempts to modify the flow and there is a node
      * that is not currently connected, an Exception will be thrown. Otherwise, the returned AsyncClusterResponse object
      * will contain the results that are immediately available, as well as an identifier for obtaining an updated result
-     * later.
+     * later. NOTE: This method will ALWAYS indicate that the request has been replicated.
      *
      * @param method the HTTP method (e.g., POST, PUT)
      * @param uri the base request URI (up to, but not including, the query string)
@@ -78,10 +96,12 @@ public interface RequestReplicator {
      * @param uri the base request URI (up to, but not including, the query string)
      * @param entity an entity
      * @param headers any HTTP headers
+     * @param indicateReplicated if <code>true</code>, will add a header indicating to the receiving nodes that the request
+     *            has already been replicated, so the receiving node will not replicate the request itself.
      *
      * @return an AsyncClusterResponse that indicates the current status of the request and provides an identifier for obtaining an updated response later
      */
-    AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers);
+    AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean indicateReplicated);
 
     /**
      * <p>

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
index 7a9b56f..f36a4f4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -36,6 +36,9 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -91,6 +94,10 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
     private final ConcurrentMap<String, StandardAsyncClusterResponse> responseMap = new ConcurrentHashMap<>();
     private final ConcurrentMap<NodeIdentifier, AtomicInteger> sequentialLongRequestCounts = new ConcurrentHashMap<>();
 
+    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+    private final Lock readLock = rwLock.readLock();
+    private final Lock writeLock = rwLock.writeLock();
+
     /**
      * Creates an instance using a connection timeout and read timeout of 3 seconds
      *
@@ -204,14 +211,18 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
         }
 
         final Set<NodeIdentifier> nodeIdSet = new HashSet<>(nodeIds);
-        return replicate(nodeIdSet, method, uri, entity, headers);
+
+        return replicate(nodeIdSet, method, uri, entity, headers, true);
     }
 
     @Override
-    public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers) {
+    public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, final boolean indicateReplicated) {
         final Map<String, String> updatedHeaders = new HashMap<>(headers);
         updatedHeaders.put(RequestReplicator.CLUSTER_ID_GENERATION_SEED_HEADER, UUID.randomUUID().toString());
-        updatedHeaders.put(RequestReplicator.REPLICATION_INDICATOR_HEADER, "true");
+
+        if (indicateReplicated) {
+            updatedHeaders.put(RequestReplicator.REPLICATION_INDICATOR_HEADER, "true");
+        }
 
         // If the user is authenticated, add them as a proxied entity so that when the receiving NiFi receives the request,
         // it knows that we are acting as a proxy on behalf of the current user.
@@ -221,7 +232,23 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
             updatedHeaders.put(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN, proxiedEntitiesChain);
         }
 
-        return replicate(nodeIds, method, uri, entity, updatedHeaders, true, null);
+        if (indicateReplicated) {
+            // If we are replicating a request and indicating that it is replicated, then this means that we are
+            // performing an action, rather than simply proxying the request to the cluster coordinator. In this case,
+            // we need to ensure that we use proper locking. We don't want two requests modifying the flow at the same
+            // time, so we use a write lock if the request is mutable and a read lock otherwise.
+            final Lock lock = isMutableRequest(method, uri.getPath()) ? writeLock : readLock;
+            logger.debug("Obtaining lock {} in order to replicate request {} {}", method, uri);
+            lock.lock();
+            try {
+                logger.debug("Lock {} obtained in order to replicate request {} {}", method, uri);
+                return replicate(nodeIds, method, uri, entity, updatedHeaders, true, null);
+            } finally {
+                lock.unlock();
+            }
+        } else {
+            return replicate(nodeIds, method, uri, entity, updatedHeaders, true, null);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index a90be64..1b410d6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -17,6 +17,20 @@
 
 package org.apache.nifi.cluster.coordination.node;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
 import org.apache.commons.collections4.queue.CircularFifoQueue;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
@@ -49,20 +63,6 @@ import org.apache.nifi.web.revision.RevisionManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
 public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandler, RequestCompletionCallback {
     private static final Logger logger = LoggerFactory.getLogger(NodeClusterCoordinator.class);
     private static final String EVENT_CATEGORY = "Clustering";
@@ -92,10 +92,14 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         senderListener.addHandler(this);
     }
 
+    // method is synchronized because it modifies local node state and then broadcasts the change. We synchronize any time that this
+    // is done so that we don't have an issue where we create a NodeConnectionStatus, then another thread creates one and sends it
+    // before the first one is sent (as this results in the first status having a larger id, which means that the first status is never
+    // seen by other nodes).
     @Override
-    public void shutdown() {
+    public synchronized void shutdown() {
         final NodeConnectionStatus shutdownStatus = new NodeConnectionStatus(getLocalNodeIdentifier(), DisconnectionCode.NODE_SHUTDOWN);
-        notifyOthersOfNodeStatusChange(shutdownStatus);
+        updateNodeStatus(shutdownStatus);
         logger.info("Successfully notified other nodes that I am shutting down");
     }
 
@@ -108,6 +112,22 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         return nodeId;
     }
 
+    private NodeIdentifier waitForLocalNodeIdentifier() {
+        NodeIdentifier localNodeId = null;
+        while (localNodeId == null) {
+            localNodeId = getLocalNodeIdentifier();
+            if (localNodeId == null) {
+                try {
+                    Thread.sleep(100L);
+                } catch (final InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+
+        return localNodeId;
+    }
+
     @Override
     public void resetNodeStatuses(final Map<NodeIdentifier, NodeConnectionStatus> statusMap) {
         logger.info("Resetting cluster node statuses from {} to {}", nodeStatuses, statusMap);
@@ -163,7 +183,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
             reportEvent(nodeId, Severity.INFO, "Requesting that node connect to cluster on behalf of " + userDn);
         }
 
-        updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTING, System.currentTimeMillis()));
+        updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTING, null, null, System.currentTimeMillis(), getRoles(nodeId)));
 
         // create the request
         final ReconnectionRequestMessage request = new ReconnectionRequestMessage();
@@ -173,6 +193,11 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         requestReconnectionAsynchronously(request, 10, 5);
     }
 
+    private Set<String> getRoles(final NodeIdentifier nodeId) {
+        final NodeConnectionStatus status = getConnectionStatus(nodeId);
+        return status == null ? Collections.emptySet() : status.getRoles();
+    }
+
     @Override
     public void finishNodeConnection(final NodeIdentifier nodeId) {
         final NodeConnectionState state = getConnectionState(nodeId);
@@ -194,12 +219,10 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         }
 
         logger.info("{} is now connected", nodeId);
-        final boolean updated = updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED));
-        if (!updated) {
-            logger.error("Attempting to Finish Node Connection but could not find Node with Identifier {}", nodeId);
-        }
+        updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED, getRoles(nodeId)));
     }
 
+
     @Override
     public void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
         logger.info("Requesting that {} disconnect due to {}", nodeId, explanation == null ? disconnectionCode : explanation);
@@ -245,7 +268,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         reportEvent(nodeId, Severity.INFO, "User " + userDn + " requested that node be removed from cluster");
         nodeStatuses.remove(nodeId);
         nodeEvents.remove(nodeId);
-        notifyOthersOfNodeStatusChange(new NodeConnectionStatus(nodeId, NodeConnectionState.REMOVED));
+        notifyOthersOfNodeStatusChange(new NodeConnectionStatus(nodeId, NodeConnectionState.REMOVED, Collections.emptySet()));
     }
 
     @Override
@@ -360,6 +383,47 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         return null;
     }
 
+    // method is synchronized because it modifies local node state and then broadcasts the change. We synchronize any time that this
+    // is done so that we don't have an issue where we create a NodeConnectionStatus, then another thread creates one and sends it
+    // before the first one is sent (as this results in the first status having a larger id, which means that the first status is never
+    // seen by other nodes).
+    @Override
+    public synchronized void addRole(final String clusterRole) {
+        final NodeIdentifier localNodeId = waitForLocalNodeIdentifier();
+        final NodeConnectionStatus status = getConnectionStatus(localNodeId);
+        final Set<String> roles = new HashSet<>();
+        if (status != null) {
+            roles.addAll(status.getRoles());
+        }
+
+        final boolean roleAdded = roles.add(clusterRole);
+
+        if (roleAdded) {
+            updateNodeRoles(localNodeId, roles);
+            logger.info("Cluster role {} added. This node is now responsible for the following roles: {}", clusterRole, roles);
+        }
+    }
+
+    // method is synchronized because it modifies local node state and then broadcasts the change. We synchronize any time that this
+    // is done so that we don't have an issue where we create a NodeConnectionStatus, then another thread creates one and sends it
+    // before the first one is sent (as this results in the first status having a larger id, which means that the first status is never
+    // seen by other nodes).
+    @Override
+    public synchronized void removeRole(final String clusterRole) {
+        final NodeIdentifier localNodeId = waitForLocalNodeIdentifier();
+        final NodeConnectionStatus status = getConnectionStatus(localNodeId);
+        final Set<String> roles = new HashSet<>();
+        if (status != null) {
+            roles.addAll(status.getRoles());
+        }
+
+        final boolean roleRemoved = roles.remove(clusterRole);
+
+        if (roleRemoved) {
+            updateNodeRoles(localNodeId, roles);
+            logger.info("Cluster role {} removed. This node is now responsible for the following roles: {}", clusterRole, roles);
+        }
+    }
 
     @Override
     public Set<NodeIdentifier> getNodeIdentifiers(final NodeConnectionState... states) {
@@ -390,6 +454,31 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
     }
 
     @Override
+    public NodeIdentifier getElectedActiveCoordinatorNode() {
+        final Set<NodeIdentifier> connectedNodeIds = getNodeIdentifiers(NodeConnectionState.CONNECTED);
+        return connectedNodeIds.stream()
+            .map(nodeId -> getConnectionStatus(nodeId))
+            .filter(status -> status.getRoles().contains(ClusterRoles.CLUSTER_COORDINATOR))
+            .findFirst()
+            .map(status -> status.getNodeIdentifier())
+            .orElse(null);
+    }
+
+    @Override
+    public boolean isActiveClusterCoordinator() {
+        final NodeIdentifier self = getLocalNodeIdentifier();
+        if (self == null) {
+            return false;
+        }
+
+        final NodeConnectionStatus selfStatus = getConnectionStatus(self);
+        if (selfStatus == null) {
+            return false;
+        }
+        return selfStatus.getRoles().contains(ClusterRoles.CLUSTER_COORDINATOR);
+    }
+
+    @Override
     public List<NodeEvent> getNodeEvents(final NodeIdentifier nodeId) {
         final CircularFifoQueue<NodeEvent> eventQueue = nodeEvents.get(nodeId);
         if (eventQueue == null) {
@@ -426,10 +515,9 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
      * if successful, <code>false</code> if no node exists with the given ID
      *
      * @param status the new status of the node
-     * @return <code>true</code> if the node exists and is updated, <code>false</code> if the node does not exist
      */
     // visible for testing.
-    boolean updateNodeStatus(final NodeConnectionStatus status) {
+    void updateNodeStatus(final NodeConnectionStatus status) {
         final NodeIdentifier nodeId = status.getNodeIdentifier();
 
         // In this case, we are using nodeStatuses.put() instead of getting the current value and
@@ -445,8 +533,6 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         if (currentState == null || currentState != status.getState()) {
             notifyOthersOfNodeStatusChange(status);
         }
-
-        return true;
     }
 
 
@@ -656,7 +742,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
             addNodeEvent(resolvedNodeIdentifier, "Connection requested from existing node.  Setting status to connecting");
         }
 
-        status = new NodeConnectionStatus(resolvedNodeIdentifier, NodeConnectionState.CONNECTING, System.currentTimeMillis());
+        status = new NodeConnectionStatus(resolvedNodeIdentifier, NodeConnectionState.CONNECTING, null, null, System.currentTimeMillis(), getRoles(resolvedNodeIdentifier));
         updateNodeStatus(status);
 
         DataFlow dataFlow = null;
@@ -741,9 +827,10 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
 
             // disconnect problematic nodes
             if (!problematicNodeResponses.isEmpty() && problematicNodeResponses.size() < nodeResponses.size()) {
-                logger.warn(String.format("The following nodes failed to process URI %s '%s'.  Requesting each node to disconnect from cluster: ", uriPath, problematicNodeResponses));
-                for (final NodeResponse nodeResponse : problematicNodeResponses) {
-                    requestNodeDisconnect(nodeResponse.getNodeId(), DisconnectionCode.FAILED_TO_SERVICE_REQUEST, "Failed to process URI to " + method + " " + uriPath);
+                final Set<NodeIdentifier> failedNodeIds = problematicNodeResponses.stream().map(response -> response.getNodeId()).collect(Collectors.toSet());
+                logger.warn(String.format("The following nodes failed to process URI %s '%s'.  Requesting each node disconnect from cluster.", uriPath, failedNodeIds));
+                for (final NodeIdentifier nodeId : failedNodeIds) {
+                    requestNodeDisconnect(nodeId, DisconnectionCode.FAILED_TO_SERVICE_REQUEST, "Failed to process request " + method + " " + uriPath);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoClusterCoordinatorException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoClusterCoordinatorException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoClusterCoordinatorException.java
new file mode 100644
index 0000000..89b6722
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoClusterCoordinatorException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.manager.exception;
+
+public class NoClusterCoordinatorException extends ClusterException {
+    private static final long serialVersionUID = -1782098541351698293L;
+
+    public NoClusterCoordinatorException() {
+        super("Action cannot be performed because there is currently no Cluster Coordinator elected. "
+            + "The request should be tried again after a moment, after a Cluster Coordinator has been automatically elected.");
+    }
+
+    public NoClusterCoordinatorException(final String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
index 7640787..81d72ed 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
@@ -158,7 +158,7 @@ public class TestAbstractHeartbeatMonitor {
 
 
     private NodeHeartbeat createHeartbeat(final NodeIdentifier nodeId, final NodeConnectionState state) {
-        final NodeConnectionStatus status = new NodeConnectionStatus(nodeId, state);
+        final NodeConnectionStatus status = new NodeConnectionStatus(nodeId, state, Collections.emptySet());
         return new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(), status, Collections.emptySet(), 0, 0, 0, 0);
     }
 
@@ -184,7 +184,7 @@ public class TestAbstractHeartbeatMonitor {
 
         @Override
         public synchronized void requestNodeConnect(NodeIdentifier nodeId, String userDn) {
-            statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTING));
+            statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTING, Collections.emptySet()));
         }
 
         @Override
@@ -194,17 +194,17 @@ public class TestAbstractHeartbeatMonitor {
 
         @Override
         public synchronized void finishNodeConnection(NodeIdentifier nodeId) {
-            statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED));
+            statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED, Collections.emptySet()));
         }
 
         @Override
         public synchronized void requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation) {
-            statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.DISCONNECTED));
+            statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.DISCONNECTED, Collections.emptySet()));
         }
 
         @Override
         public synchronized void disconnectionRequestedByNode(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation) {
-            statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.DISCONNECTED));
+            statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.DISCONNECTED, Collections.emptySet()));
         }
 
         @Override
@@ -287,6 +287,24 @@ public class TestAbstractHeartbeatMonitor {
         @Override
         public void setConnected(boolean connected) {
         }
+
+        @Override
+        public NodeIdentifier getElectedActiveCoordinatorNode() {
+            return null;
+        }
+
+        @Override
+        public boolean isActiveClusterCoordinator() {
+            return false;
+        }
+
+        @Override
+        public void addRole(String clusterRole) {
+        }
+
+        @Override
+        public void removeRole(String clusterRole) {
+        }
     }
 
     public static class ReportedEvent {

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
index a9c0af9..2af3e88 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
@@ -28,6 +28,7 @@ import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -86,7 +87,7 @@ public class TestThreadPoolRequestReplicator {
             final URI uri = new URI("http://localhost:8080/processors/1");
             final Entity entity = new ProcessorEntity();
 
-            final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>());
+            final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true);
 
             // We should get back the same response object
             assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier()));
@@ -114,7 +115,7 @@ public class TestThreadPoolRequestReplicator {
             final URI uri = new URI("http://localhost:8080/processors/1");
             final Entity entity = new ProcessorEntity();
 
-            final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>());
+            final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true);
 
             // We should get back the same response object
             assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier()));
@@ -150,7 +151,7 @@ public class TestThreadPoolRequestReplicator {
             final URI uri = new URI("http://localhost:8080/processors/1");
             final Entity entity = new ProcessorEntity();
 
-            final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>());
+            final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true);
             assertNotNull(response.awaitMergedResponse(1, TimeUnit.SECONDS));
         } , null, 0L, new IllegalArgumentException("Exception created for unit test"));
     }
@@ -163,7 +164,7 @@ public class TestThreadPoolRequestReplicator {
         nodeIds.add(nodeId);
 
         final ClusterCoordinator coordinator = Mockito.mock(ClusterCoordinator.class);
-        Mockito.when(coordinator.getConnectionStatus(Mockito.any(NodeIdentifier.class))).thenReturn(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED));
+        Mockito.when(coordinator.getConnectionStatus(Mockito.any(NodeIdentifier.class))).thenReturn(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED, Collections.emptySet()));
 
         final AtomicInteger requestCount = new AtomicInteger(0);
         final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null) {
@@ -190,7 +191,7 @@ public class TestThreadPoolRequestReplicator {
 
         try {
             final AsyncClusterResponse clusterResponse = replicator.replicate(nodeIds, HttpMethod.POST,
-                new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>());
+                new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>(), true);
             clusterResponse.awaitMergedResponse();
 
             // Ensure that we received two requests - the first should contain the X-NcmExpects header; the second should not.
@@ -209,7 +210,7 @@ public class TestThreadPoolRequestReplicator {
         Mockito.when(coordinator.getConnectionStatus(Mockito.any(NodeIdentifier.class))).thenAnswer(new Answer<NodeConnectionStatus>() {
             @Override
             public NodeConnectionStatus answer(InvocationOnMock invocation) throws Throwable {
-                return new NodeConnectionStatus(invocation.getArgumentAt(0, NodeIdentifier.class), NodeConnectionState.CONNECTED);
+                return new NodeConnectionStatus(invocation.getArgumentAt(0, NodeIdentifier.class), NodeConnectionState.CONNECTED, Collections.emptySet());
             }
         });
 
@@ -234,7 +235,7 @@ public class TestThreadPoolRequestReplicator {
         Mockito.when(coordinator.getConnectionStates()).thenReturn(nodeMap);
         final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null) {
             @Override
-            public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers) {
+            public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean indicateReplicated) {
                 return null;
             }
         };
@@ -307,7 +308,7 @@ public class TestThreadPoolRequestReplicator {
 
         try {
             final AsyncClusterResponse clusterResponse = replicator.replicate(nodeIds, HttpMethod.POST,
-                new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>());
+                new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>(), true);
             clusterResponse.awaitMergedResponse();
 
             Assert.fail("Expected to get an IllegalClusterStateException but did not");

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
index f1cccb6..e3d5295 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
@@ -89,10 +89,10 @@ public class TestNodeClusterCoordinator {
     public void testConnectionResponseIndicatesAllNodes() throws IOException {
         // Add a disconnected node
         coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(1), DisconnectionCode.LACK_OF_HEARTBEAT));
-        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.DISCONNECTING));
-        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), NodeConnectionState.CONNECTING));
-        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), NodeConnectionState.CONNECTED));
-        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), NodeConnectionState.CONNECTED));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.DISCONNECTING, Collections.emptySet()));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), NodeConnectionState.CONNECTING, Collections.emptySet()));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), NodeConnectionState.CONNECTED, Collections.emptySet()));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), NodeConnectionState.CONNECTED, Collections.emptySet()));
 
         // Create a connection request message and send to the coordinator
         final NodeIdentifier requestedNodeId = createNodeId(6);
@@ -272,10 +272,10 @@ public class TestNodeClusterCoordinator {
     public void testGetConnectionStates() throws IOException {
         // Add a disconnected node
         coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(1), DisconnectionCode.LACK_OF_HEARTBEAT));
-        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.DISCONNECTING));
-        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), NodeConnectionState.CONNECTING));
-        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), NodeConnectionState.CONNECTED));
-        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), NodeConnectionState.CONNECTED));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.DISCONNECTING, Collections.emptySet()));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), NodeConnectionState.CONNECTING, Collections.emptySet()));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), NodeConnectionState.CONNECTED, Collections.emptySet()));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), NodeConnectionState.CONNECTED, Collections.emptySet()));
 
         final Map<NodeConnectionState, List<NodeIdentifier>> stateMap = coordinator.getConnectionStates();
         assertEquals(4, stateMap.size());
@@ -302,10 +302,10 @@ public class TestNodeClusterCoordinator {
     public void testGetNodeIdentifiers() throws IOException {
         // Add a disconnected node
         coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(1), DisconnectionCode.LACK_OF_HEARTBEAT));
-        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.DISCONNECTING));
-        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), NodeConnectionState.CONNECTING));
-        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), NodeConnectionState.CONNECTED));
-        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), NodeConnectionState.CONNECTED));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.DISCONNECTING, Collections.emptySet()));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), NodeConnectionState.CONNECTING, Collections.emptySet()));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), NodeConnectionState.CONNECTED, Collections.emptySet()));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), NodeConnectionState.CONNECTED, Collections.emptySet()));
 
         final Set<NodeIdentifier> connectedIds = coordinator.getNodeIdentifiers(NodeConnectionState.CONNECTED);
         assertEquals(2, connectedIds.size());
@@ -330,7 +330,7 @@ public class TestNodeClusterCoordinator {
     public void testRequestNodeDisconnect() throws InterruptedException {
         // Add a connected node
         final NodeIdentifier nodeId = createNodeId(1);
-        coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED, Collections.emptySet()));
 
         // wait for the status change message and clear it
         while (nodeStatusChangeMessages.isEmpty()) {
@@ -356,8 +356,8 @@ public class TestNodeClusterCoordinator {
         final NodeIdentifier nodeId1 = createNodeId(1);
         final NodeIdentifier nodeId2 = createNodeId(2);
 
-        coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED));
-        coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, NodeConnectionState.CONNECTED));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED, Collections.emptySet()));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, NodeConnectionState.CONNECTED, Collections.emptySet()));
 
         // wait for the status change message and clear it
         while (nodeStatusChangeMessages.size() < 2) {
@@ -381,7 +381,7 @@ public class TestNodeClusterCoordinator {
         assertEquals(NodeConnectionState.CONNECTED, curStatus.getState());
 
         // Verify that resetMap updates only the newer statuses
-        final NodeConnectionStatus node2Disconnecting = new NodeConnectionStatus(nodeId2, NodeConnectionState.DISCONNECTING);
+        final NodeConnectionStatus node2Disconnecting = new NodeConnectionStatus(nodeId2, NodeConnectionState.DISCONNECTING, Collections.emptySet());
         final Map<NodeIdentifier, NodeConnectionStatus> resetMap = new HashMap<>();
         resetMap.put(nodeId1, oldStatus);
         resetMap.put(nodeId2, node2Disconnecting);
@@ -398,14 +398,14 @@ public class TestNodeClusterCoordinator {
         final NodeIdentifier nodeId1 = createNodeId(1);
         final NodeIdentifier nodeId2 = createNodeId(2);
 
-        coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED, Collections.emptySet()));
         // wait for the status change message and clear it
         while (nodeStatusChangeMessages.isEmpty()) {
             Thread.sleep(10L);
         }
         nodeStatusChangeMessages.clear();
 
-        coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, NodeConnectionState.CONNECTED));
+        coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, NodeConnectionState.CONNECTED, Collections.emptySet()));
         // wait for the status change message and clear it
         while (nodeStatusChangeMessages.isEmpty()) {
             Thread.sleep(10L);

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLock.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLock.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLock.java
deleted file mode 100644
index b5c974f..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLock.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.web.concurrent;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-
-public interface DistributedLock {
-
-    /**
-     * Obtains a lock, blocking as long as necessary to obtain the lock.
-     * Once a lock has been obtained, the identifier of the version of the lock is returned,
-     * which can be passed to the {@link #withLock(String, Callable)} or
-     * {@link #unlock(String)} method. Once this method returns, it is
-     * important that either the {@link #withLock(String, Callable)} or
-     * {@link #unlock(String)} method be called with this identifier. Otherwise,
-     * any attempt to claim another read lock or write lock will block until this
-     * lock expires.
-     *
-     * @return the identifier
-     */
-    String lock();
-
-    /**
-     * Obtains a lock, blocking as long as necessary to obtain the lock.
-     * Once a lock has been obtained, the identifier of the version of the lock is returned,
-     * which can be passed to the {@link #withLock(String, Callable)} or
-     * {@link #unlock(String)} method. Once this method returns, it is
-     * important that either the {@link #withLock(String, Callable)} or
-     * {@link #unlock(String)} method be called with this identifier. Otherwise,
-     * any attempt to claim another read lock or write lock will block until this
-     * lock expires.
-     *
-     * @param versionIdentifier a value that should be used as the version id instead of generating one.
-     *            This allows us to ensure that all nodes in the cluster use the same id.
-     *
-     * @return the identifier
-     */
-    String lock(String versionIdentifier);
-
-    /**
-     * Waits up to the given amount of time to obtain a lock. If the lock is obtained
-     * within this time period, the identifier will be returned, as with {@link #lock()}.
-     * If the lock cannot be obtained within the given time period, <code>null</code> will
-     * be returned.
-     *
-     * @param time the maximum amount of time to wait for the lock
-     * @param timeUnit the unit of time that the time parameter is in
-     * @return the identifier of the lock, or <code>null</code> if no lock is obtained
-     */
-    String tryLock(long time, TimeUnit timeUnit);
-
-    /**
-     * Waits up to the given amount of time to obtain a lock. If the lock is obtained
-     * within this time period, the identifier will be returned, as with {@link #lock()}.
-     * If the lock cannot be obtained within the given time period, <code>null</code> will
-     * be returned.
-     *
-     * @param time the maximum amount of time to wait for the lock
-     * @param timeUnit the unit of time that the time parameter is in
-     * @param versionIdentifier a value that should be used as the version id instead of generating one.
-     *            This allows us to ensure that all nodes in the cluster use the same id.
-     * @return the identifier of the lock, or <code>null</code> if no lock is obtained
-     */
-    String tryLock(long time, TimeUnit timeUnit, String versionIdentifier);
-
-    /**
-     * Performs the given action while this lock is held. The identifier of the lock that was
-     * obtained by calling {@link #lock()} must be provided to this method. If the
-     * lock identifier is incorrect, or the lock has expired, a {@link LockExpiredException}
-     * will be thrown. This method provides a mechanism for verifying that the lock obtained
-     * by {@link #lock()} or {@link #tryLock(long, TimeUnit)} is still valid and that the action
-     * being performed will be done so without the lock expiring (i.e., if the lock expires while
-     * the action is being performed, the lock won't be released until the provided action completes).
-     *
-     * @param identifier the identifier of the lock that has already been obtained
-     * @param action the action to perform
-     *
-     * @return the value returned by the given action
-     *
-     * @throws LockExpiredException if the provided identifier is not the identifier of the currently
-     *             held lock, or if the lock that was obtained has already expired and is no longer valid
-     */
-    <T> T withLock(String identifier, Supplier<T> action) throws LockExpiredException;
-
-    /**
-     * Cancels the lock with the given identifier, so that the lock is no longer valid.
-     *
-     * @param identifier the identifier of the lock that was obtained by calling {@link #lock()}.
-     *
-     * @throws LockExpiredException if the provided identifier is not the identifier of the currently
-     *             held lock, or if the lock that was obtained has already expired and is no longer valid
-     */
-    void unlock(String identifier) throws LockExpiredException;
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLockingManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLockingManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLockingManager.java
deleted file mode 100644
index 09c1d5d..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLockingManager.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.web.concurrent;
-
-/**
- * <p>
- * A DistributedLockingManager is responsible for exposing a mechanism that
- * clients can use to obtain a lock on the dataflow.
- * </p>
- *
- * <p>
- * Because of the way in which NiFi replicates requests from one node to all
- * other nodes in the cluster, it is important that all nodes in the cluster
- * are able to obtain a lock for the request before the request is allowed to
- * proceed. This is accomplished by using a two-phase approach. For each request
- * that will require a lock (either a read (shared) lock or a write (mutually
- * exclusive) lock), the request must be done in two phases. The first phase is
- * responsible for obtaining the lock and optionally performing validation of
- * the request. Once a node has obtained the necessary lock and performed any
- * required validation, the node will respond to the web request with a status
- * code of 150 - NodeContinue.
- * </p>
- *
- * <p>
- * At this point, the node that originated the request
- * will verify that either all nodes obtained a lock or that at least one node
- * failed to obtain a lock. If all nodes respond with a 150 - NodeContinue,
- * then the second phase of the request will occur. In the second phase, the
- * actual logic of the desired request is performed while the lock is held.
- * The lock is then released, once the logic is performed (or if the logic fails
- * to be performed).
- * </p>
- *
- * <p>
- * In the case that at least one node responds with a status code with than
- * 150 - NodeContinue, the node that originated the request will instead issue
- * a cancel request for the second phase so that all nodes are able to unlock
- * the lock that was previously obtained for the request.
- * </p>
- *
- * <p>
- * A key consideration in this type of approach that must be taken into account
- * is that the node that originated the request could, at any point in time, fail
- * as a result of the process being killed, power loss, network connectivity problems,
- * etc. As a result, the locks that are obtained through a DistributedLockingManager
- * are designed to expire after some amount of time, so that locks are not held
- * indefinitely, even in the case of node failure.
- * </p>
- */
-public interface DistributedLockingManager {
-
-    DistributedLock getReadLock();
-
-    DistributedLock getWriteLock();
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/LockExpiredException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/LockExpiredException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/LockExpiredException.java
deleted file mode 100644
index c673c9f..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/LockExpiredException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.web.concurrent;
-
-public class LockExpiredException extends RuntimeException {
-    private static final long serialVersionUID = 1L;
-
-    public LockExpiredException(String message) {
-        super(message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java
index 6b6bc45..e10454f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java
@@ -22,7 +22,6 @@ import java.util.List;
 
 import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.web.Revision;
-import org.apache.nifi.web.concurrent.DistributedLockingManager;
 
 
 /**
@@ -55,12 +54,6 @@ import org.apache.nifi.web.concurrent.DistributedLockingManager;
  * request may continue, this means that all nodes have agreed that the client's Revisios are
  * acceptable.
  * </p>
- *
- * <p>
- * To ensure that the revisions remain consistent between the time that they are validated and
- * the time that the modification takes place, it is important that the revisions always be
- * validated while an appropriate read or write lock is held, via the {@link DistributedLockingManager}.
- * </p>
  */
 public interface RevisionManager {
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 8c710fa..0c90c50 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -65,6 +65,7 @@ import org.apache.nifi.authorization.resource.ProvenanceEventAuthorizable;
 import org.apache.nifi.authorization.resource.ResourceFactory;
 import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.cluster.HeartbeatPayload;
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
 import org.apache.nifi.cluster.coordination.node.ClusterRoles;
 import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
@@ -305,6 +306,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     private final List<Connectable> startConnectablesAfterInitialization;
     private final List<RemoteGroupPort> startRemoteGroupPortsAfterInitialization;
     private final LeaderElectionManager leaderElectionManager;
+    private final ClusterCoordinator clusterCoordinator;
 
     /**
      * true if controller is configured to operate in a clustered environment
@@ -383,6 +385,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             /* configuredForClustering */ false,
             /* NodeProtocolSender */ null,
             bulletinRepo,
+            /* cluster coordinator */ null,
             /* heartbeat monitor */ null);
     }
 
@@ -394,6 +397,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         final StringEncryptor encryptor,
         final NodeProtocolSender protocolSender,
         final BulletinRepository bulletinRepo,
+        final ClusterCoordinator clusterCoordinator,
         final HeartbeatMonitor heartbeatMonitor) {
         final FlowController flowController = new FlowController(
             flowFileEventRepo,
@@ -404,6 +408,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             /* configuredForClustering */ true,
             protocolSender,
             bulletinRepo,
+            clusterCoordinator,
             heartbeatMonitor);
 
         flowController.setClusterManagerRemoteSiteInfo(properties.getRemoteInputPort(), properties.getRemoteInputHttpPort(), properties.isSiteToSiteSecure());
@@ -420,6 +425,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         final boolean configuredForClustering,
         final NodeProtocolSender protocolSender,
         final BulletinRepository bulletinRepo,
+        final ClusterCoordinator clusterCoordinator,
         final HeartbeatMonitor heartbeatMonitor) {
 
         maxTimerDrivenThreads = new AtomicInteger(10);
@@ -430,6 +436,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         this.heartbeatMonitor = heartbeatMonitor;
         sslContext = SslContextFactory.createSslContext(properties, false);
         extensionManager = new ExtensionManager();
+        this.clusterCoordinator = clusterCoordinator;
 
         timerDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxTimerDrivenThreads.get(), "Timer-Driven Process"));
         eventDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxEventDrivenThreads.get(), "Event-Driven Process"));
@@ -3181,11 +3188,19 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             @Override
             public void onLeaderRelinquish() {
                 heartbeatMonitor.stop();
+
+                if (clusterCoordinator != null) {
+                    clusterCoordinator.removeRole(ClusterRoles.CLUSTER_COORDINATOR);
+                }
             }
 
             @Override
             public void onLeaderElection() {
                 heartbeatMonitor.start();
+
+                if (clusterCoordinator != null) {
+                    clusterCoordinator.addRole(ClusterRoles.CLUSTER_COORDINATOR);
+                }
             }
         });
     }
@@ -3819,7 +3834,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 return null;
             }
 
-            final Set<String> roles = bean.isPrimary() ? Collections.singleton(ClusterRoles.PRIMARY_NODE) : Collections.emptySet();
+            final Set<String> roles = new HashSet<>();
+            if (bean.isPrimary()) {
+                roles.add(ClusterRoles.PRIMARY_NODE);
+            }
+            if (clusterCoordinator.isActiveClusterCoordinator()) {
+                roles.add(ClusterRoles.CLUSTER_COORDINATOR);
+            }
+
             final Heartbeat heartbeat = new Heartbeat(nodeId, roles, bean.getConnectionStatus(), hbPayload.marshal());
             final HeartbeatMessage message = new HeartbeatMessage();
             message.setHeartbeat(heartbeat);

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index e2bdcf0..d5d40b6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -83,6 +83,7 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -867,7 +868,9 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
             controller.setClustered(true, response.getInstanceId(), response.getCoordinatorDN());
             controller.setClusterManagerRemoteSiteInfo(response.getManagerRemoteInputPort(), response.getManagerRemoteInputHttpPort(), response.isManagerRemoteCommsSecure());
 
-            controller.setConnectionStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED));
+            final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId);
+            final Set<String> roles = status == null ? Collections.emptySet() : status.getRoles();
+            controller.setConnectionStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED, roles));
 
             // start the processors as indicated by the dataflow
             controller.onFlowInitialized(autoResumeState);

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
index ecec0a3..7de36c8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
@@ -18,6 +18,7 @@ package org.apache.nifi.spring;
 
 import org.apache.nifi.admin.service.AuditService;
 import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
 import org.apache.nifi.cluster.protocol.NodeProtocolSender;
 import org.apache.nifi.controller.FlowController;
@@ -52,6 +53,7 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
             if (properties.isNode()) {
                 final NodeProtocolSender nodeProtocolSender = applicationContext.getBean("nodeProtocolSender", NodeProtocolSender.class);
                 final HeartbeatMonitor heartbeatMonitor = applicationContext.getBean("heartbeatMonitor", HeartbeatMonitor.class);
+                final ClusterCoordinator clusterCoordinator = applicationContext.getBean("clusterCoordinator", ClusterCoordinator.class);
                 flowController = FlowController.createClusteredInstance(
                     flowFileEventRepository,
                     properties,
@@ -60,6 +62,7 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
                     encryptor,
                     nodeProtocolSender,
                     bulletinRepository,
+                    clusterCoordinator,
                     heartbeatMonitor);
             } else {
                 flowController = FlowController.createStandaloneInstance(

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 2d093ce..b8d9cfd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -22,7 +22,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
-import java.util.function.Supplier;
 
 import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.controller.ScheduledState;
@@ -98,7 +97,6 @@ import org.apache.nifi.web.api.entity.SnippetEntity;
 import org.apache.nifi.web.api.entity.TemplateEntity;
 import org.apache.nifi.web.api.entity.UserEntity;
 import org.apache.nifi.web.api.entity.UserGroupEntity;
-import org.apache.nifi.web.concurrent.LockExpiredException;
 
 /**
  * Defines the NiFiServiceFacade interface.
@@ -117,104 +115,6 @@ public interface NiFiServiceFacade {
     void authorizeAccess(AuthorizeAccess authorizeAccess);
 
     /**
-     * Obtains a read (shared) lock for the entire flow, so that no other
-     * requests can be made to modify the flow until either this read lock
-     * is released via {@link #releaseReadLock()} or the lock expires
-     *
-     * @return an identifier that indicates the version of the lock, so that other
-     *         requests cannot release a lock that was held by this request
-     */
-    String obtainReadLock();
-
-    /**
-     * Obtains a read (shared) lock for the entire flow, so that no other
-     * requests can be made to modify the flow until either this read lock
-     * is released via {@link #releaseReadLock()} or the lock expires
-     *
-     * @param versionId specifies a value to use for the Version ID for the lock
-     *
-     * @return an identifier that indicates the version of the lock, so that other
-     *         requests cannot release a lock that was held by this request
-     */
-    String obtainReadLock(String versionId);
-
-    /**
-     * Performs the given action while holding the read lock that has already been obtained
-     * with the given versionIdentifier. This allows the given action to be performed without
-     * allowing the read lock to expire until the entire action has completed.
-     *
-     * @param versionIdentifier the identifier that indicates the version of the lock that
-     *            is held. The value that is to be passed here is the value that was returned from the
-     *            call to {@link #obtainReadLock()}.
-     * @param action the action to perform
-     *
-     * @return the value returned by the action
-     * @throws LockExpiredException if the lock has expired before the action is invoked
-     * @throws Exception any Exception thrown by the given action is propagated
-     */
-    <T> T withReadLock(String versionIdentifier, Supplier<T> action) throws LockExpiredException;
-
-    /**
-     * Releases the read lock held on this flow
-     *
-     * @param versionIdentifier the identifier that indicates the version of the lock that
-     *            is held. The value that is to be passed here is the value that was returned from the
-     *            call to {@link #obtainReadLock()}.
-     *
-     * @throws LockExpiredException if the lock with the given identifier has already expired or is not valid
-     */
-    void releaseReadLock(String versionIdentifier) throws LockExpiredException;
-
-    /**
-     * Obtains a write (mutually exclusive) lock for the entire flow, so that no other
-     * requests can be made to read or modify the flow until either this write lock
-     * is released via {@link #releaseWriteLock()} or the lock expires
-     *
-     * @return an identifier that indicates the version of the lock, so that other
-     *         requests cannot release a lock that was held by this request
-     */
-    String obtainWriteLock();
-
-    /**
-     * Obtains a write (mutually exclusive) lock for the entire flow, so that no other
-     * requests can be made to read or modify the flow until either this write lock
-     * is released via {@link #releaseWriteLock()} or the lock expires
-     *
-     * @param versionId specifies a value to use for the Version ID for the lock
-     *
-     * @return an identifier that indicates the version of the lock, so that other
-     *         requests cannot release a lock that was held by this request
-     */
-    String obtainWriteLock(String versionId);
-
-    /**
-     * Performs the given action while holding the write lock that has already been obtained
-     * with the given versionIdentifier. This allows the given action to be performed without
-     * allowing the write lock to expire until the entire action has completed.
-     *
-     * @param versionIdentifier the identifier that indicates the version of the lock that
-     *            is held. The value that is to be passed here is the value that was returned from the
-     *            call to {@link #obtainWriteLock()}.
-     * @param action the action to perform
-     *
-     * @return the value returned by the action
-     * @throws LockExpiredException if the lock has expired before the action is invoked
-     * @throws Exception any Exception thrown by the given action is propagated
-     */
-    <T> T withWriteLock(String versionIdentifier, Supplier<T> action) throws LockExpiredException;
-
-    /**
-     * Releases the write lock held on the flow
-     *
-     * @param versionIdentifier the identifier that indicates the version of the lock that
-     *            is held. The value that is to be passed here is the value that was returned from the
-     *            call to {@link #obtainWriteLock()}.
-     *
-     * @throws LockExpiredException if the lock with the given identifier has already expired or is not valid
-     */
-    void releaseWriteLock(String versionIdentifier) throws LockExpiredException;
-
-    /**
      * Claims the specified revision for the specified user.
      *
      * @param revision revision

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java
index 8a6b438..f1c47a1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java
@@ -25,6 +25,7 @@ import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
 import org.apache.nifi.cluster.manager.NodeResponse;
 import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
+import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.controller.repository.claim.ContentDirection;
 import org.apache.nifi.util.NiFiProperties;
@@ -87,12 +88,17 @@ public class StandardNiFiContentAccess implements ContentAccess {
 
             // get the target node and ensure it exists
             final NodeIdentifier nodeId = clusterCoordinator.getNodeIdentifier(request.getClusterNodeId());
-            final Set<NodeIdentifier> targetNodes = Collections.singleton(nodeId);
 
-            // replicate the request to the specific node
+            // replicate the request to the cluster coordinator, indicating the target node
             NodeResponse nodeResponse;
             try {
-                nodeResponse = requestReplicator.replicate(targetNodes, HttpMethod.GET, dataUri, parameters, headers).awaitMergedResponse();
+                headers.put(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, nodeId.getId());
+                final NodeIdentifier coordinatorNode = clusterCoordinator.getElectedActiveCoordinatorNode();
+                if (coordinatorNode == null) {
+                    throw new NoClusterCoordinatorException();
+                }
+                final Set<NodeIdentifier> coordinatorNodes = Collections.singleton(coordinatorNode);
+                nodeResponse = requestReplicator.replicate(coordinatorNodes, HttpMethod.GET, dataUri, parameters, headers, false).awaitMergedResponse();
             } catch (InterruptedException e) {
                 throw new IllegalClusterStateException("Interrupted while waiting for a response from node");
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 3526f32..490a9bb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -183,8 +183,6 @@ import org.apache.nifi.web.api.entity.TemplateEntity;
 import org.apache.nifi.web.api.entity.TenantEntity;
 import org.apache.nifi.web.api.entity.UserEntity;
 import org.apache.nifi.web.api.entity.UserGroupEntity;
-import org.apache.nifi.web.concurrent.DistributedLockingManager;
-import org.apache.nifi.web.concurrent.LockExpiredException;
 import org.apache.nifi.web.controller.ControllerFacade;
 import org.apache.nifi.web.dao.AccessPolicyDAO;
 import org.apache.nifi.web.dao.ConnectionDAO;
@@ -258,54 +256,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     private Authorizer authorizer;
 
     private AuthorizableLookup authorizableLookup;
-    private DistributedLockingManager lockManager;
 
     // -----------------------------------------
     // Synchronization methods
     // -----------------------------------------
     @Override
-    public String obtainReadLock() {
-        return lockManager.getReadLock().lock();
-    }
-
-    @Override
-    public String obtainReadLock(final String versionIdSeed) {
-        return lockManager.getReadLock().lock(versionIdSeed);
-    }
-
-    @Override
-    public <T> T withReadLock(final String versionIdentifier, final Supplier<T> action) throws LockExpiredException {
-        return lockManager.getReadLock().withLock(versionIdentifier, action);
-    }
-
-    @Override
-    public void releaseReadLock(final String versionIdentifier) throws LockExpiredException {
-        lockManager.getReadLock().unlock(versionIdentifier);
-    }
-
-    @Override
-    public String obtainWriteLock() {
-        return lockManager.getWriteLock().lock();
-    }
-
-    @Override
-    public String obtainWriteLock(final String versionIdSeed) {
-        return lockManager.getWriteLock().lock(versionIdSeed);
-    }
-
-    @Override
-    public <T> T withWriteLock(final String versionIdentifier, final Supplier<T> action) throws LockExpiredException {
-        return lockManager.getWriteLock().withLock(versionIdentifier, action);
-    }
-
-    @Override
-    public void releaseWriteLock(final String versionIdentifier) throws LockExpiredException {
-        lockManager.getWriteLock().unlock(versionIdentifier);
-    }
-
-
-
-    @Override
     public void authorizeAccess(final AuthorizeAccess authorizeAccess) {
         authorizeAccess.authorize(authorizableLookup);
     }
@@ -2900,8 +2855,4 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     public void setBulletinRepository(final BulletinRepository bulletinRepository) {
         this.bulletinRepository = bulletinRepository;
     }
-
-    public void setLockManager(final DistributedLockingManager lockManager) {
-        this.lockManager = lockManager;
-    }
 }