You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/02/02 15:25:05 UTC

[6/6] incubator-nifi git commit: NIFI-250: - Updating the ControllerService endpoint to specify the availability of the service. - Updating the optimistic locking manager to ensure the proper revision is checked.

NIFI-250:
- Updating the ControllerService endpoint to specify the availability of the service.
- Updating the optimistic locking manager to ensure the proper revision is checked.

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/22822d33
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/22822d33
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/22822d33

Branch: refs/heads/NIFI-250
Commit: 22822d33a2159d1f9a2b12caafbce7c2b46ebe52
Parents: 346cc0c
Author: Matt Gilman <ma...@gmail.com>
Authored: Mon Feb 2 09:10:12 2015 -0500
Committer: Matt Gilman <ma...@gmail.com>
Committed: Mon Feb 2 09:10:12 2015 -0500

----------------------------------------------------------------------
 .../cluster/manager/impl/WebClusterManager.java | 10 +-
 .../spring/WebClusterManagerFactoryBean.java    | 10 +-
 .../resources/nifi-cluster-manager-context.xml  |  4 +
 .../nifi/web/api/ControllerServiceResource.java | 99 +++++++++++++++-----
 .../dao/impl/StandardControllerServiceDAO.java  |  1 -
 .../OptimisticLockingManagerFactoryBean.java    | 74 +++++++++++++++
 .../src/main/resources/nifi-web-api-context.xml |  8 +-
 7 files changed, 172 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/22822d33/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index fec8e0c..585d151 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -210,6 +210,7 @@ import org.xml.sax.SAXException;
 import org.xml.sax.SAXParseException;
 
 import com.sun.jersey.api.client.ClientResponse;
+import org.apache.nifi.web.OptimisticLockingManager;
 
 /**
  * Provides a cluster manager implementation. The manager federates incoming
@@ -303,6 +304,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     private final HttpResponseMapper httpResponseMapper;
     private final DataFlowManagementService dataFlowManagementService;
     private final ClusterManagerProtocolSenderListener senderListener;
+    private final OptimisticLockingManager optimisticLockingManager;
     private final StringEncryptor encryptor;
     private final Queue<Heartbeat> pendingHeartbeats = new ConcurrentLinkedQueue<>();
     private final ReentrantReadWriteLock resourceRWLock = new ReentrantReadWriteLock();
@@ -315,7 +317,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     // null means the dataflow should be read from disk
     private StandardDataFlow cachedDataFlow = null;
     private NodeIdentifier primaryNodeId = null;
-    private Revision revision = new Revision(0L, "");
     private Timer heartbeatMonitor;
     private Timer heartbeatProcessor;
     private volatile ClusterServicesBroadcaster servicesBroadcaster = null;
@@ -336,7 +337,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
 
     public WebClusterManager(final HttpRequestReplicator httpRequestReplicator, final HttpResponseMapper httpResponseMapper,
             final DataFlowManagementService dataFlowManagementService, final ClusterManagerProtocolSenderListener senderListener,
-            final NiFiProperties properties, final StringEncryptor encryptor) {
+            final NiFiProperties properties, final StringEncryptor encryptor, final OptimisticLockingManager optimisticLockingManager) {
 
         if (httpRequestReplicator == null) {
             throw new IllegalArgumentException("HttpRequestReplicator may not be null.");
@@ -360,6 +361,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         this.instanceId = UUID.randomUUID().toString();
         this.senderListener = senderListener;
         this.encryptor = encryptor;
+        this.optimisticLockingManager = optimisticLockingManager;
         senderListener.addHandler(this);
         senderListener.setBulletinRepository(bulletinRepository);
 
@@ -2135,7 +2137,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         final Map<String, String> updatedHeaders = new HashMap<>(headers);
         final ClusterContext clusterCtx = new ClusterContextImpl();
         clusterCtx.setRequestSentByClusterManager(true);                 // indicate request is sent from cluster manager
-        clusterCtx.setRevision(revision);
+        clusterCtx.setRevision(optimisticLockingManager.getRevision());
 
         // serialize cluster context and add to request header
         final String serializedClusterCtx = WebUtils.serializeObjectToHex(clusterCtx);
@@ -2804,7 +2806,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                                     }
                                 }
                             }
-                            revision = clusterContext.getRevision();
+                            optimisticLockingManager.setRevision(clusterContext.getRevision());
                         }
                     }
                 } catch (final ClassNotFoundException cnfe) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/22822d33/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
index 3881461..86bc07c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
@@ -29,6 +29,7 @@ import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.io.socket.multicast.DiscoverableService;
 import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
 import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.OptimisticLockingManager;
 import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.FactoryBean;
 import org.springframework.context.ApplicationContext;
@@ -48,6 +49,8 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon
     private NiFiProperties properties;
 
     private StringEncryptor encryptor;
+    
+    private OptimisticLockingManager optimisticLockingManager;
 
     @Override
     public Object getObject() throws Exception {
@@ -79,7 +82,8 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon
                     dataFlowService,
                     senderListener,
                     properties,
-                    encryptor
+                    encryptor,
+                    optimisticLockingManager
             );
 
             // set the service broadcaster
@@ -130,4 +134,8 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon
     public void setEncryptor(final StringEncryptor encryptor) {
         this.encryptor = encryptor;
     }
+    
+    public void setOptimisticLockingManager(OptimisticLockingManager optimisticLockingManager) {
+        this.optimisticLockingManager = optimisticLockingManager;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/22822d33/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/resources/nifi-cluster-manager-context.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/resources/nifi-cluster-manager-context.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/resources/nifi-cluster-manager-context.xml
index 68c29bc..72c7bff 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/resources/nifi-cluster-manager-context.xml
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/resources/nifi-cluster-manager-context.xml
@@ -91,10 +91,14 @@
         <property name="properties" ref="nifiProperties"/>
     </bean>
 
+    <!-- cluster manager optimistic locking manager -->
+    <bean id="clusterManagerOptimisticLockingManager" class="org.apache.nifi.web.StandardOptimisticLockingManager"/>
+
     <!-- cluster manager -->
     <bean id="clusterManager" class="org.apache.nifi.cluster.spring.WebClusterManagerFactoryBean">
         <property name="properties" ref="nifiProperties"/>
         <property name="encryptor" ref="stringEncryptor"/>
+        <property name="optimisticLockingManager" ref="clusterManagerOptimisticLockingManager"/>
     </bean>
     
     <!-- discoverable services -->

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/22822d33/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
index 83d9abd..4b36f28 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
@@ -51,6 +51,7 @@ import org.apache.nifi.web.api.dto.RevisionDTO;
 import org.apache.nifi.web.api.request.ClientIdParameter;
 import org.apache.nifi.web.api.request.LongParameter;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.controller.Availability;
 import org.apache.nifi.web.api.dto.ControllerServiceDTO;
 import org.apache.nifi.web.api.entity.ControllerServiceEntity;
 import org.apache.nifi.web.api.entity.ControllerServicesEntity;
@@ -93,21 +94,48 @@ public class ControllerServiceResource extends ApplicationResource {
     }
 
     /**
+     * Parses the availability and ensure that the specified availability makes sense for the
+     * given NiFi instance.
+     * 
+     * @param availability
+     * @return 
+     */
+    private Availability parseAvailability(final String availability) {
+        final Availability avail;
+        try {
+            avail = Availability.valueOf(availability.toUpperCase());
+        } catch (IllegalArgumentException iae) {
+            throw new IllegalArgumentException(String.format("Availability: Value must be one of [%s]", StringUtils.join(Availability.values(), ", ")));
+        }
+        
+        // ensure this nifi is an NCM is specifying NCM availability
+        if (!properties.isClusterManager() && Availability.NCM.equals(avail)) {
+            throw new IllegalArgumentException("Availability of NCM is only applicable when the NiFi instance is the cluster manager.");
+        }
+        
+        return avail;
+    }
+    
+    /**
      * Retrieves all the of controller services in this NiFi.
      *
      * @param clientId Optional client id. If the client id is not specified, a
      * new one will be generated. This value (whether specified or generated) is
      * included in the response.
+     * @param availability Whether the controller service is available on the NCM only (ncm) or on the 
+     * nodes only (node). If this instance is not clustered all services should use the node availability.
      * @return A controllerServicesEntity.
      */
     @GET
     @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+    @Path("{availability}")
     @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
     @TypeHint(ControllerServicesEntity.class)
-    public Response getControllerServices(@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId) {
-
+    public Response getControllerServices(@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, @PathParam("availability") String availability) {
+        final Availability avail = parseAvailability(availability);
+        
         // replicate if cluster manager
-        if (properties.isClusterManager()) {
+        if (properties.isClusterManager() && Availability.NODE.equals(avail)) {
             return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
         }
 
@@ -136,20 +164,24 @@ public class ControllerServiceResource extends ApplicationResource {
      * @param clientId Optional client id. If the client id is not specified, a
      * new one will be generated. This value (whether specified or generated) is
      * included in the response.
+     * @param availability Whether the controller service is available on the NCM only (ncm) or on the 
+     * nodes only (node). If this instance is not clustered all services should use the node availability.
      * @param type The type of controller service to create.
      * @return A controllerServiceEntity.
      */
     @POST
     @Consumes(MediaType.APPLICATION_FORM_URLENCODED)
     @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+    @Path("{availability}")
     @PreAuthorize("hasRole('ROLE_DFM')")
     @TypeHint(ControllerServiceEntity.class)
     public Response createControllerService(
             @Context HttpServletRequest httpServletRequest,
             @FormParam(VERSION) LongParameter version,
             @FormParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+            @PathParam("availability") String availability, 
             @FormParam("type") String type) {
-
+        
         // create the controller service DTO
         final ControllerServiceDTO controllerServiceDTO = new ControllerServiceDTO();
         controllerServiceDTO.setType(type);
@@ -166,24 +198,30 @@ public class ControllerServiceResource extends ApplicationResource {
         controllerServiceEntity.setRevision(revision);
         controllerServiceEntity.setControllerService(controllerServiceDTO);
 
-        return createControllerService(httpServletRequest, controllerServiceEntity);
+        return createControllerService(httpServletRequest, availability, controllerServiceEntity);
     }
 
     /**
      * Creates a new Controller Service.
      *
      * @param httpServletRequest
+     * @param availability Whether the controller service is available on the NCM only (ncm) or on the 
+     * nodes only (node). If this instance is not clustered all services should use the node availability.
      * @param controllerServiceEntity A controllerServiceEntity.
      * @return A controllerServiceEntity.
      */
     @POST
     @Consumes({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
     @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+    @Path("{availability}")
     @PreAuthorize("hasRole('ROLE_DFM')")
     @TypeHint(ControllerServiceEntity.class)
     public Response createControllerService(
             @Context HttpServletRequest httpServletRequest,
+            @PathParam("availability") String availability, 
             ControllerServiceEntity controllerServiceEntity) {
+        
+        final Availability avail = parseAvailability(availability);
 
         if (controllerServiceEntity == null || controllerServiceEntity.getControllerService()== null) {
             throw new IllegalArgumentException("Controller service details must be specified.");
@@ -201,7 +239,7 @@ public class ControllerServiceResource extends ApplicationResource {
         }
 
         // if cluster manager, convert POST to PUT (to maintain same ID across nodes) and replicate
-        if (properties.isClusterManager()) {
+        if (properties.isClusterManager() && Availability.NODE.equals(avail)) {
 
             // apply action to the cluster manager first
             serviceFacade.createControllerService(new Revision(revision.getVersion(), revision.getClientId()), controllerServiceEntity.getControllerService());
@@ -260,18 +298,23 @@ public class ControllerServiceResource extends ApplicationResource {
      * @param clientId Optional client id. If the client id is not specified, a
      * new one will be generated. This value (whether specified or generated) is
      * included in the response.
+     * @param availability Whether the controller service is available on the NCM only (ncm) or on the 
+     * nodes only (node). If this instance is not clustered all services should use the node availability.
      * @param id The id of the controller service to retrieve
      * @return A controllerServiceEntity.
      */
     @GET
     @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
-    @Path("{id}")
+    @Path("{availability}/{id}")
     @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
     @TypeHint(ControllerServiceEntity.class)
-    public Response getControllerService(@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, @PathParam("id") String id) {
+    public Response getControllerService(@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, 
+            @PathParam("availability") String availability, @PathParam("id") String id) {
 
+        final Availability avail = parseAvailability(availability);
+        
         // replicate if cluster manager
-        if (properties.isClusterManager()) {
+        if (properties.isClusterManager() && Availability.NODE.equals(avail)) {
             return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
         }
 
@@ -299,6 +342,8 @@ public class ControllerServiceResource extends ApplicationResource {
      * @param clientId Optional client id. If the client id is not specified, a
      * new one will be generated. This value (whether specified or generated) is
      * included in the response.
+     * @param availability Whether the controller service is available on the NCM only (ncm) or on the 
+     * nodes only (node). If this instance is not clustered all services should use the node availability.
      * @param id The id of the controller service to update.
      * @param name The name of the controller service
      * @param annotationData The annotation data for the controller service
@@ -324,20 +369,20 @@ public class ControllerServiceResource extends ApplicationResource {
     @PUT
     @Consumes(MediaType.APPLICATION_FORM_URLENCODED)
     @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
-    @Path("{id}")
+    @Path("{availability}/{id}")
     @PreAuthorize("hasRole('ROLE_DFM')")
     @TypeHint(ControllerServiceEntity.class)
     public Response updateControllerService(
             @Context HttpServletRequest httpServletRequest,
             @FormParam(VERSION) LongParameter version,
             @FormParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
-            @PathParam("id") String id, @FormParam("name") String name,
+            @PathParam("availability") String availability, @PathParam("id") String id, @FormParam("name") String name,
             @FormParam("annotationData") String annotationData, @FormParam("comments") String comments,
             @FormParam("enabled") Boolean enabled, @FormParam("markedForDeletion[]") List<String> markedForDeletion,
             MultivaluedMap<String, String> formParams) {
 
         // create collections for holding the controller service properties
-        final Map<String, String> properties = new LinkedHashMap<>();
+        final Map<String, String> updatedProperties = new LinkedHashMap<>();
         
         // go through each parameter and look for processor properties
         for (String parameterName : formParams.keySet()) {
@@ -349,7 +394,7 @@ public class ControllerServiceResource extends ApplicationResource {
                     final int endIndex = StringUtils.lastIndexOf(parameterName, "]");
                     if (startIndex != -1 && endIndex != -1) {
                         final String propertyName = StringUtils.substring(parameterName, startIndex + 1, endIndex);
-                        properties.put(propertyName, formParams.getFirst(parameterName));
+                        updatedProperties.put(propertyName, formParams.getFirst(parameterName));
                     }
                 }
             }
@@ -357,7 +402,7 @@ public class ControllerServiceResource extends ApplicationResource {
         
         // set the properties to remove
         for (String propertyToDelete : markedForDeletion) {
-            properties.put(propertyToDelete, null);
+            updatedProperties.put(propertyToDelete, null);
         }
         
         // create the controller service DTO
@@ -369,8 +414,8 @@ public class ControllerServiceResource extends ApplicationResource {
         controllerServiceDTO.setEnabled(enabled);
 
         // only set the properties when appropriate
-        if (!properties.isEmpty()) {
-            controllerServiceDTO.setProperties(properties);
+        if (!updatedProperties.isEmpty()) {
+            controllerServiceDTO.setProperties(updatedProperties);
         }
         
         // create the revision
@@ -386,13 +431,15 @@ public class ControllerServiceResource extends ApplicationResource {
         controllerServiceEntity.setControllerService(controllerServiceDTO);
 
         // update the controller service
-        return updateControllerService(httpServletRequest, id, controllerServiceEntity);
+        return updateControllerService(httpServletRequest, availability, id, controllerServiceEntity);
     }
 
     /**
      * Updates the specified a new Controller Service.
      *
      * @param httpServletRequest
+     * @param availability Whether the controller service is available on the NCM only (ncm) or on the 
+     * nodes only (node). If this instance is not clustered all services should use the node availability.
      * @param id The id of the controller service to update.
      * @param controllerServiceEntity A controllerServiceEntity.
      * @return A controllerServiceEntity.
@@ -400,14 +447,17 @@ public class ControllerServiceResource extends ApplicationResource {
     @PUT
     @Consumes({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
     @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
-    @Path("{id}")
+    @Path("{availability}/{id}")
     @PreAuthorize("hasRole('ROLE_DFM')")
     @TypeHint(ControllerServiceEntity.class)
     public Response updateControllerService(
             @Context HttpServletRequest httpServletRequest,
+            @PathParam("availability") String availability, 
             @PathParam("id") String id,
             ControllerServiceEntity controllerServiceEntity) {
 
+        final Availability avail = parseAvailability(availability);
+        
         if (controllerServiceEntity == null || controllerServiceEntity.getControllerService()== null) {
             throw new IllegalArgumentException("Controller service details must be specified.");
         }
@@ -424,7 +474,7 @@ public class ControllerServiceResource extends ApplicationResource {
         }
 
         // replicate if cluster manager
-        if (properties.isClusterManager()) {
+        if (properties.isClusterManager() && Availability.NODE.equals(avail)) {
             // change content type to JSON for serializing entity
             final Map<String, String> headersToOverride = new HashMap<>();
             headersToOverride.put("content-type", MediaType.APPLICATION_JSON);
@@ -471,22 +521,26 @@ public class ControllerServiceResource extends ApplicationResource {
      * @param clientId Optional client id. If the client id is not specified, a
      * new one will be generated. This value (whether specified or generated) is
      * included in the response.
+     * @param availability Whether the controller service is available on the NCM only (ncm) or on the 
+     * nodes only (node). If this instance is not clustered all services should use the node availability.
      * @param id The id of the controller service to remove.
      * @return A entity containing the client id and an updated revision.
      */
     @DELETE
     @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
-    @Path("{id}")
+    @Path("{availability}/{id}")
     @PreAuthorize("hasRole('ROLE_DFM')")
     @TypeHint(ControllerServiceEntity.class)
     public Response removeControllerService(
             @Context HttpServletRequest httpServletRequest,
             @QueryParam(VERSION) LongParameter version,
             @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
-            @PathParam("id") String id) {
+            @PathParam("availability") String availability, @PathParam("id") String id) {
 
+        final Availability avail = parseAvailability(availability);
+        
         // replicate if cluster manager
-        if (properties.isClusterManager()) {
+        if (properties.isClusterManager() && Availability.NODE.equals(avail)) {
             return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
         }
 
@@ -519,6 +573,7 @@ public class ControllerServiceResource extends ApplicationResource {
     }
 
     // setters
+    
     public void setServiceFacade(NiFiServiceFacade serviceFacade) {
         this.serviceFacade = serviceFacade;
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/22822d33/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
index ccc8023..551dcd3 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
@@ -216,7 +216,6 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
      */
     private void configureControllerService(final ControllerServiceNode controllerService, final ControllerServiceDTO controllerServiceDTO) {
         final String name = controllerServiceDTO.getName();
-        final String availability = controllerServiceDTO.getAvailability();
         final String annotationData = controllerServiceDTO.getAnnotationData();
         final String comments = controllerServiceDTO.getComments();
         final Map<String, String> properties = controllerServiceDTO.getProperties();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/22822d33/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/spring/OptimisticLockingManagerFactoryBean.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/spring/OptimisticLockingManagerFactoryBean.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/spring/OptimisticLockingManagerFactoryBean.java
new file mode 100644
index 0000000..850d2fd
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/spring/OptimisticLockingManagerFactoryBean.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.spring;
+
+import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.ClusterAwareOptimisticLockingManager;
+import org.apache.nifi.web.ClusterRequestException;
+import org.apache.nifi.web.OptimisticLockingManager;
+import org.apache.nifi.web.StandardOptimisticLockingManager;
+import org.apache.nifi.web.dao.ControllerServiceDAO;
+import org.apache.nifi.web.dao.impl.StandardControllerServiceDAO;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.FactoryBean;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+
+/**
+ *
+ */
+public class OptimisticLockingManagerFactoryBean implements FactoryBean, ApplicationContextAware {
+
+    private ApplicationContext context;
+    private OptimisticLockingManager optimisticLockingManager;
+    private NiFiProperties properties;
+
+    @Override
+    public Object getObject() throws Exception {
+        if (optimisticLockingManager == null) {
+            if (properties.isClusterManager()) {
+                optimisticLockingManager = context.getBean("clusterManagerOptimisticLockingManager", OptimisticLockingManager.class);
+            } else {
+                optimisticLockingManager = new ClusterAwareOptimisticLockingManager(new StandardOptimisticLockingManager());
+            }
+        }
+
+        return optimisticLockingManager;
+    }
+
+    @Override
+    public Class getObjectType() {
+        return OptimisticLockingManager.class;
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return true;
+    }
+
+    public void setProperties(NiFiProperties properties) {
+        this.properties = properties;
+    }
+
+    @Override
+    public void setApplicationContext(ApplicationContext context) throws BeansException {
+        this.context = context;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/22822d33/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
index 8bc4fdd..61b68c5 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
@@ -25,11 +25,7 @@
     http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd">
 
     <!-- optimistic locking manager -->
-    <bean id="optimisticLockingManager" class="org.apache.nifi.web.ClusterAwareOptimisticLockingManager">
-        <constructor-arg>
-            <bean class="org.apache.nifi.web.StandardOptimisticLockingManager" />
-        </constructor-arg>
-    </bean>
+    <bean id="webOptimisticLockingManager" class="org.apache.nifi.web.spring.OptimisticLockingManagerFactoryBean" depends-on="clusterManagerOptimisticLockingManager"/>
 
     <!-- dto factory -->
     <bean id="dtoFactory" class="org.apache.nifi.web.api.dto.DtoFactory">
@@ -103,7 +99,7 @@
         <property name="auditService" ref="auditService"/>
         <property name="userService" ref="userService"/>
         <property name="snippetUtils" ref="snippetUtils"/>
-        <property name="optimisticLockingManager" ref="optimisticLockingManager"/>
+        <property name="optimisticLockingManager" ref="webOptimisticLockingManager"/>
         <property name="dtoFactory" ref="dtoFactory"/>
         <property name="clusterManager" ref="clusterManager"/>
     </bean>