You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/02/02 15:25:05 UTC
[6/6] incubator-nifi git commit: NIFI-250: - Updating the
ControllerService endpoint to specify the availability of the service. -
Updating the optimistic locking manager to ensure the proper revision is
checked.
NIFI-250:
- Updating the ControllerService endpoint to specify the availability of the service.
- Updating the optimistic locking manager to ensure the proper revision is checked.
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/22822d33
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/22822d33
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/22822d33
Branch: refs/heads/NIFI-250
Commit: 22822d33a2159d1f9a2b12caafbce7c2b46ebe52
Parents: 346cc0c
Author: Matt Gilman <ma...@gmail.com>
Authored: Mon Feb 2 09:10:12 2015 -0500
Committer: Matt Gilman <ma...@gmail.com>
Committed: Mon Feb 2 09:10:12 2015 -0500
----------------------------------------------------------------------
.../cluster/manager/impl/WebClusterManager.java | 10 +-
.../spring/WebClusterManagerFactoryBean.java | 10 +-
.../resources/nifi-cluster-manager-context.xml | 4 +
.../nifi/web/api/ControllerServiceResource.java | 99 +++++++++++++++-----
.../dao/impl/StandardControllerServiceDAO.java | 1 -
.../OptimisticLockingManagerFactoryBean.java | 74 +++++++++++++++
.../src/main/resources/nifi-web-api-context.xml | 8 +-
7 files changed, 172 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/22822d33/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index fec8e0c..585d151 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -210,6 +210,7 @@ import org.xml.sax.SAXException;
import org.xml.sax.SAXParseException;
import com.sun.jersey.api.client.ClientResponse;
+import org.apache.nifi.web.OptimisticLockingManager;
/**
* Provides a cluster manager implementation. The manager federates incoming
@@ -303,6 +304,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
private final HttpResponseMapper httpResponseMapper;
private final DataFlowManagementService dataFlowManagementService;
private final ClusterManagerProtocolSenderListener senderListener;
+ private final OptimisticLockingManager optimisticLockingManager;
private final StringEncryptor encryptor;
private final Queue<Heartbeat> pendingHeartbeats = new ConcurrentLinkedQueue<>();
private final ReentrantReadWriteLock resourceRWLock = new ReentrantReadWriteLock();
@@ -315,7 +317,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
// null means the dataflow should be read from disk
private StandardDataFlow cachedDataFlow = null;
private NodeIdentifier primaryNodeId = null;
- private Revision revision = new Revision(0L, "");
private Timer heartbeatMonitor;
private Timer heartbeatProcessor;
private volatile ClusterServicesBroadcaster servicesBroadcaster = null;
@@ -336,7 +337,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
public WebClusterManager(final HttpRequestReplicator httpRequestReplicator, final HttpResponseMapper httpResponseMapper,
final DataFlowManagementService dataFlowManagementService, final ClusterManagerProtocolSenderListener senderListener,
- final NiFiProperties properties, final StringEncryptor encryptor) {
+ final NiFiProperties properties, final StringEncryptor encryptor, final OptimisticLockingManager optimisticLockingManager) {
if (httpRequestReplicator == null) {
throw new IllegalArgumentException("HttpRequestReplicator may not be null.");
@@ -360,6 +361,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
this.instanceId = UUID.randomUUID().toString();
this.senderListener = senderListener;
this.encryptor = encryptor;
+ this.optimisticLockingManager = optimisticLockingManager;
senderListener.addHandler(this);
senderListener.setBulletinRepository(bulletinRepository);
@@ -2135,7 +2137,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final Map<String, String> updatedHeaders = new HashMap<>(headers);
final ClusterContext clusterCtx = new ClusterContextImpl();
clusterCtx.setRequestSentByClusterManager(true); // indicate request is sent from cluster manager
- clusterCtx.setRevision(revision);
+ clusterCtx.setRevision(optimisticLockingManager.getRevision());
// serialize cluster context and add to request header
final String serializedClusterCtx = WebUtils.serializeObjectToHex(clusterCtx);
@@ -2804,7 +2806,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
}
}
- revision = clusterContext.getRevision();
+ optimisticLockingManager.setRevision(clusterContext.getRevision());
}
}
} catch (final ClassNotFoundException cnfe) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/22822d33/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
index 3881461..86bc07c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
@@ -29,6 +29,7 @@ import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.io.socket.multicast.DiscoverableService;
import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.OptimisticLockingManager;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.context.ApplicationContext;
@@ -48,6 +49,8 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon
private NiFiProperties properties;
private StringEncryptor encryptor;
+
+ private OptimisticLockingManager optimisticLockingManager;
@Override
public Object getObject() throws Exception {
@@ -79,7 +82,8 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon
dataFlowService,
senderListener,
properties,
- encryptor
+ encryptor,
+ optimisticLockingManager
);
// set the service broadcaster
@@ -130,4 +134,8 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon
public void setEncryptor(final StringEncryptor encryptor) {
this.encryptor = encryptor;
}
+
+ public void setOptimisticLockingManager(OptimisticLockingManager optimisticLockingManager) {
+ this.optimisticLockingManager = optimisticLockingManager;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/22822d33/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/resources/nifi-cluster-manager-context.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/resources/nifi-cluster-manager-context.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/resources/nifi-cluster-manager-context.xml
index 68c29bc..72c7bff 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/resources/nifi-cluster-manager-context.xml
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/resources/nifi-cluster-manager-context.xml
@@ -91,10 +91,14 @@
<property name="properties" ref="nifiProperties"/>
</bean>
+ <!-- cluster manager optimistic locking manager -->
+ <bean id="clusterManagerOptimisticLockingManager" class="org.apache.nifi.web.StandardOptimisticLockingManager"/>
+
<!-- cluster manager -->
<bean id="clusterManager" class="org.apache.nifi.cluster.spring.WebClusterManagerFactoryBean">
<property name="properties" ref="nifiProperties"/>
<property name="encryptor" ref="stringEncryptor"/>
+ <property name="optimisticLockingManager" ref="clusterManagerOptimisticLockingManager"/>
</bean>
<!-- discoverable services -->
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/22822d33/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
index 83d9abd..4b36f28 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
@@ -51,6 +51,7 @@ import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.request.ClientIdParameter;
import org.apache.nifi.web.api.request.LongParameter;
import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.controller.Availability;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ControllerServicesEntity;
@@ -93,21 +94,48 @@ public class ControllerServiceResource extends ApplicationResource {
}
/**
+ * Parses the availability and ensure that the specified availability makes sense for the
+ * given NiFi instance.
+ *
+ * @param availability
+ * @return
+ */
+ private Availability parseAvailability(final String availability) {
+ final Availability avail;
+ try {
+ avail = Availability.valueOf(availability.toUpperCase());
+ } catch (IllegalArgumentException iae) {
+ throw new IllegalArgumentException(String.format("Availability: Value must be one of [%s]", StringUtils.join(Availability.values(), ", ")));
+ }
+
+ // ensure this nifi is an NCM is specifying NCM availability
+ if (!properties.isClusterManager() && Availability.NCM.equals(avail)) {
+ throw new IllegalArgumentException("Availability of NCM is only applicable when the NiFi instance is the cluster manager.");
+ }
+
+ return avail;
+ }
+
+ /**
* Retrieves all the of controller services in this NiFi.
*
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
+ * @param availability Whether the controller service is available on the NCM only (ncm) or on the
+ * nodes only (node). If this instance is not clustered all services should use the node availability.
* @return A controllerServicesEntity.
*/
@GET
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+ @Path("{availability}")
@PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
@TypeHint(ControllerServicesEntity.class)
- public Response getControllerServices(@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId) {
-
+ public Response getControllerServices(@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, @PathParam("availability") String availability) {
+ final Availability avail = parseAvailability(availability);
+
// replicate if cluster manager
- if (properties.isClusterManager()) {
+ if (properties.isClusterManager() && Availability.NODE.equals(avail)) {
return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
}
@@ -136,20 +164,24 @@ public class ControllerServiceResource extends ApplicationResource {
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
+ * @param availability Whether the controller service is available on the NCM only (ncm) or on the
+ * nodes only (node). If this instance is not clustered all services should use the node availability.
* @param type The type of controller service to create.
* @return A controllerServiceEntity.
*/
@POST
@Consumes(MediaType.APPLICATION_FORM_URLENCODED)
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+ @Path("{availability}")
@PreAuthorize("hasRole('ROLE_DFM')")
@TypeHint(ControllerServiceEntity.class)
public Response createControllerService(
@Context HttpServletRequest httpServletRequest,
@FormParam(VERSION) LongParameter version,
@FormParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+ @PathParam("availability") String availability,
@FormParam("type") String type) {
-
+
// create the controller service DTO
final ControllerServiceDTO controllerServiceDTO = new ControllerServiceDTO();
controllerServiceDTO.setType(type);
@@ -166,24 +198,30 @@ public class ControllerServiceResource extends ApplicationResource {
controllerServiceEntity.setRevision(revision);
controllerServiceEntity.setControllerService(controllerServiceDTO);
- return createControllerService(httpServletRequest, controllerServiceEntity);
+ return createControllerService(httpServletRequest, availability, controllerServiceEntity);
}
/**
* Creates a new Controller Service.
*
* @param httpServletRequest
+ * @param availability Whether the controller service is available on the NCM only (ncm) or on the
+ * nodes only (node). If this instance is not clustered all services should use the node availability.
* @param controllerServiceEntity A controllerServiceEntity.
* @return A controllerServiceEntity.
*/
@POST
@Consumes({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+ @Path("{availability}")
@PreAuthorize("hasRole('ROLE_DFM')")
@TypeHint(ControllerServiceEntity.class)
public Response createControllerService(
@Context HttpServletRequest httpServletRequest,
+ @PathParam("availability") String availability,
ControllerServiceEntity controllerServiceEntity) {
+
+ final Availability avail = parseAvailability(availability);
if (controllerServiceEntity == null || controllerServiceEntity.getControllerService()== null) {
throw new IllegalArgumentException("Controller service details must be specified.");
@@ -201,7 +239,7 @@ public class ControllerServiceResource extends ApplicationResource {
}
// if cluster manager, convert POST to PUT (to maintain same ID across nodes) and replicate
- if (properties.isClusterManager()) {
+ if (properties.isClusterManager() && Availability.NODE.equals(avail)) {
// apply action to the cluster manager first
serviceFacade.createControllerService(new Revision(revision.getVersion(), revision.getClientId()), controllerServiceEntity.getControllerService());
@@ -260,18 +298,23 @@ public class ControllerServiceResource extends ApplicationResource {
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
+ * @param availability Whether the controller service is available on the NCM only (ncm) or on the
+ * nodes only (node). If this instance is not clustered all services should use the node availability.
* @param id The id of the controller service to retrieve
* @return A controllerServiceEntity.
*/
@GET
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
- @Path("{id}")
+ @Path("{availability}/{id}")
@PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
@TypeHint(ControllerServiceEntity.class)
- public Response getControllerService(@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, @PathParam("id") String id) {
+ public Response getControllerService(@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+ @PathParam("availability") String availability, @PathParam("id") String id) {
+ final Availability avail = parseAvailability(availability);
+
// replicate if cluster manager
- if (properties.isClusterManager()) {
+ if (properties.isClusterManager() && Availability.NODE.equals(avail)) {
return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
}
@@ -299,6 +342,8 @@ public class ControllerServiceResource extends ApplicationResource {
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
+ * @param availability Whether the controller service is available on the NCM only (ncm) or on the
+ * nodes only (node). If this instance is not clustered all services should use the node availability.
* @param id The id of the controller service to update.
* @param name The name of the controller service
* @param annotationData The annotation data for the controller service
@@ -324,20 +369,20 @@ public class ControllerServiceResource extends ApplicationResource {
@PUT
@Consumes(MediaType.APPLICATION_FORM_URLENCODED)
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
- @Path("{id}")
+ @Path("{availability}/{id}")
@PreAuthorize("hasRole('ROLE_DFM')")
@TypeHint(ControllerServiceEntity.class)
public Response updateControllerService(
@Context HttpServletRequest httpServletRequest,
@FormParam(VERSION) LongParameter version,
@FormParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
- @PathParam("id") String id, @FormParam("name") String name,
+ @PathParam("availability") String availability, @PathParam("id") String id, @FormParam("name") String name,
@FormParam("annotationData") String annotationData, @FormParam("comments") String comments,
@FormParam("enabled") Boolean enabled, @FormParam("markedForDeletion[]") List<String> markedForDeletion,
MultivaluedMap<String, String> formParams) {
// create collections for holding the controller service properties
- final Map<String, String> properties = new LinkedHashMap<>();
+ final Map<String, String> updatedProperties = new LinkedHashMap<>();
// go through each parameter and look for processor properties
for (String parameterName : formParams.keySet()) {
@@ -349,7 +394,7 @@ public class ControllerServiceResource extends ApplicationResource {
final int endIndex = StringUtils.lastIndexOf(parameterName, "]");
if (startIndex != -1 && endIndex != -1) {
final String propertyName = StringUtils.substring(parameterName, startIndex + 1, endIndex);
- properties.put(propertyName, formParams.getFirst(parameterName));
+ updatedProperties.put(propertyName, formParams.getFirst(parameterName));
}
}
}
@@ -357,7 +402,7 @@ public class ControllerServiceResource extends ApplicationResource {
// set the properties to remove
for (String propertyToDelete : markedForDeletion) {
- properties.put(propertyToDelete, null);
+ updatedProperties.put(propertyToDelete, null);
}
// create the controller service DTO
@@ -369,8 +414,8 @@ public class ControllerServiceResource extends ApplicationResource {
controllerServiceDTO.setEnabled(enabled);
// only set the properties when appropriate
- if (!properties.isEmpty()) {
- controllerServiceDTO.setProperties(properties);
+ if (!updatedProperties.isEmpty()) {
+ controllerServiceDTO.setProperties(updatedProperties);
}
// create the revision
@@ -386,13 +431,15 @@ public class ControllerServiceResource extends ApplicationResource {
controllerServiceEntity.setControllerService(controllerServiceDTO);
// update the controller service
- return updateControllerService(httpServletRequest, id, controllerServiceEntity);
+ return updateControllerService(httpServletRequest, availability, id, controllerServiceEntity);
}
/**
* Updates the specified a new Controller Service.
*
* @param httpServletRequest
+ * @param availability Whether the controller service is available on the NCM only (ncm) or on the
+ * nodes only (node). If this instance is not clustered all services should use the node availability.
* @param id The id of the controller service to update.
* @param controllerServiceEntity A controllerServiceEntity.
* @return A controllerServiceEntity.
@@ -400,14 +447,17 @@ public class ControllerServiceResource extends ApplicationResource {
@PUT
@Consumes({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
- @Path("{id}")
+ @Path("{availability}/{id}")
@PreAuthorize("hasRole('ROLE_DFM')")
@TypeHint(ControllerServiceEntity.class)
public Response updateControllerService(
@Context HttpServletRequest httpServletRequest,
+ @PathParam("availability") String availability,
@PathParam("id") String id,
ControllerServiceEntity controllerServiceEntity) {
+ final Availability avail = parseAvailability(availability);
+
if (controllerServiceEntity == null || controllerServiceEntity.getControllerService()== null) {
throw new IllegalArgumentException("Controller service details must be specified.");
}
@@ -424,7 +474,7 @@ public class ControllerServiceResource extends ApplicationResource {
}
// replicate if cluster manager
- if (properties.isClusterManager()) {
+ if (properties.isClusterManager() && Availability.NODE.equals(avail)) {
// change content type to JSON for serializing entity
final Map<String, String> headersToOverride = new HashMap<>();
headersToOverride.put("content-type", MediaType.APPLICATION_JSON);
@@ -471,22 +521,26 @@ public class ControllerServiceResource extends ApplicationResource {
* @param clientId Optional client id. If the client id is not specified, a
* new one will be generated. This value (whether specified or generated) is
* included in the response.
+ * @param availability Whether the controller service is available on the NCM only (ncm) or on the
+ * nodes only (node). If this instance is not clustered all services should use the node availability.
* @param id The id of the controller service to remove.
* @return A entity containing the client id and an updated revision.
*/
@DELETE
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
- @Path("{id}")
+ @Path("{availability}/{id}")
@PreAuthorize("hasRole('ROLE_DFM')")
@TypeHint(ControllerServiceEntity.class)
public Response removeControllerService(
@Context HttpServletRequest httpServletRequest,
@QueryParam(VERSION) LongParameter version,
@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
- @PathParam("id") String id) {
+ @PathParam("availability") String availability, @PathParam("id") String id) {
+ final Availability avail = parseAvailability(availability);
+
// replicate if cluster manager
- if (properties.isClusterManager()) {
+ if (properties.isClusterManager() && Availability.NODE.equals(avail)) {
return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
}
@@ -519,6 +573,7 @@ public class ControllerServiceResource extends ApplicationResource {
}
// setters
+
public void setServiceFacade(NiFiServiceFacade serviceFacade) {
this.serviceFacade = serviceFacade;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/22822d33/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
index ccc8023..551dcd3 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
@@ -216,7 +216,6 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
*/
private void configureControllerService(final ControllerServiceNode controllerService, final ControllerServiceDTO controllerServiceDTO) {
final String name = controllerServiceDTO.getName();
- final String availability = controllerServiceDTO.getAvailability();
final String annotationData = controllerServiceDTO.getAnnotationData();
final String comments = controllerServiceDTO.getComments();
final Map<String, String> properties = controllerServiceDTO.getProperties();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/22822d33/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/spring/OptimisticLockingManagerFactoryBean.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/spring/OptimisticLockingManagerFactoryBean.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/spring/OptimisticLockingManagerFactoryBean.java
new file mode 100644
index 0000000..850d2fd
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/spring/OptimisticLockingManagerFactoryBean.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.spring;
+
+import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.ClusterAwareOptimisticLockingManager;
+import org.apache.nifi.web.ClusterRequestException;
+import org.apache.nifi.web.OptimisticLockingManager;
+import org.apache.nifi.web.StandardOptimisticLockingManager;
+import org.apache.nifi.web.dao.ControllerServiceDAO;
+import org.apache.nifi.web.dao.impl.StandardControllerServiceDAO;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.FactoryBean;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+
+/**
+ *
+ */
+public class OptimisticLockingManagerFactoryBean implements FactoryBean, ApplicationContextAware {
+
+ private ApplicationContext context;
+ private OptimisticLockingManager optimisticLockingManager;
+ private NiFiProperties properties;
+
+ @Override
+ public Object getObject() throws Exception {
+ if (optimisticLockingManager == null) {
+ if (properties.isClusterManager()) {
+ optimisticLockingManager = context.getBean("clusterManagerOptimisticLockingManager", OptimisticLockingManager.class);
+ } else {
+ optimisticLockingManager = new ClusterAwareOptimisticLockingManager(new StandardOptimisticLockingManager());
+ }
+ }
+
+ return optimisticLockingManager;
+ }
+
+ @Override
+ public Class getObjectType() {
+ return OptimisticLockingManager.class;
+ }
+
+ @Override
+ public boolean isSingleton() {
+ return true;
+ }
+
+ public void setProperties(NiFiProperties properties) {
+ this.properties = properties;
+ }
+
+ @Override
+ public void setApplicationContext(ApplicationContext context) throws BeansException {
+ this.context = context;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/22822d33/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
index 8bc4fdd..61b68c5 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
@@ -25,11 +25,7 @@
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd">
<!-- optimistic locking manager -->
- <bean id="optimisticLockingManager" class="org.apache.nifi.web.ClusterAwareOptimisticLockingManager">
- <constructor-arg>
- <bean class="org.apache.nifi.web.StandardOptimisticLockingManager" />
- </constructor-arg>
- </bean>
+ <bean id="webOptimisticLockingManager" class="org.apache.nifi.web.spring.OptimisticLockingManagerFactoryBean" depends-on="clusterManagerOptimisticLockingManager"/>
<!-- dto factory -->
<bean id="dtoFactory" class="org.apache.nifi.web.api.dto.DtoFactory">
@@ -103,7 +99,7 @@
<property name="auditService" ref="auditService"/>
<property name="userService" ref="userService"/>
<property name="snippetUtils" ref="snippetUtils"/>
- <property name="optimisticLockingManager" ref="optimisticLockingManager"/>
+ <property name="optimisticLockingManager" ref="webOptimisticLockingManager"/>
<property name="dtoFactory" ref="dtoFactory"/>
<property name="clusterManager" ref="clusterManager"/>
</bean>