You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2017/08/17 14:43:20 UTC
[3/9] nifi git commit: NIFI-4224: - Initial implementation of Process
Group level Variable Registry - Updated to incorporate PR Feedback - Changed
log message because slf4j-simple apparently has a memory leak;
passing a String instead of passing in the C
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 36a9524..35686a5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -16,7 +16,31 @@
*/
package org.apache.nifi.web;
-import com.google.common.collect.Sets;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.FlowChangeAction;
@@ -84,6 +108,7 @@ import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.history.History;
import org.apache.nifi.history.HistoryQuery;
import org.apache.nifi.history.PreviousValue;
+import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinQuery;
@@ -92,6 +117,7 @@ import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.api.dto.AccessPolicyDTO;
import org.apache.nifi.web.api.dto.AccessPolicySummaryDTO;
+import org.apache.nifi.web.api.dto.AffectedComponentDTO;
import org.apache.nifi.web.api.dto.BulletinBoardDTO;
import org.apache.nifi.web.api.dto.BulletinDTO;
import org.apache.nifi.web.api.dto.BulletinQueryDTO;
@@ -137,6 +163,7 @@ import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO;
import org.apache.nifi.web.api.dto.TemplateDTO;
import org.apache.nifi.web.api.dto.UserDTO;
import org.apache.nifi.web.api.dto.UserGroupDTO;
+import org.apache.nifi.web.api.dto.VariableRegistryDTO;
import org.apache.nifi.web.api.dto.action.HistoryDTO;
import org.apache.nifi.web.api.dto.action.HistoryQueryDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO;
@@ -157,6 +184,7 @@ import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.nifi.web.api.entity.AccessPolicyEntity;
import org.apache.nifi.web.api.entity.AccessPolicySummaryEntity;
import org.apache.nifi.web.api.entity.ActionEntity;
+import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
import org.apache.nifi.web.api.entity.BulletinEntity;
import org.apache.nifi.web.api.entity.ComponentReferenceEntity;
import org.apache.nifi.web.api.entity.ConnectionEntity;
@@ -190,6 +218,8 @@ import org.apache.nifi.web.api.entity.TemplateEntity;
import org.apache.nifi.web.api.entity.TenantEntity;
import org.apache.nifi.web.api.entity.UserEntity;
import org.apache.nifi.web.api.entity.UserGroupEntity;
+import org.apache.nifi.web.api.entity.VariableEntity;
+import org.apache.nifi.web.api.entity.VariableRegistryEntity;
import org.apache.nifi.web.controller.ControllerFacade;
import org.apache.nifi.web.dao.AccessPolicyDAO;
import org.apache.nifi.web.dao.ConnectionDAO;
@@ -217,28 +247,7 @@ import org.apache.nifi.web.util.SnippetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
+import com.google.common.collect.Sets;
/**
* Implementation of NiFiServiceFacade that performs revision checking.
@@ -423,6 +432,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
+ public void verifyActivateControllerServices(final String groupId, final ControllerServiceState state, final Set<String> serviceIds) {
+ processGroupDAO.verifyActivateControllerServices(groupId, state, serviceIds);
+ }
+
+ @Override
public void verifyDeleteProcessGroup(final String groupId) {
processGroupDAO.verifyDelete(groupId);
}
@@ -624,6 +638,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
*/
private <D, C> RevisionUpdate<D> updateComponent(final Revision revision, final Authorizable authorizable, final Supplier<C> daoUpdate, final Function<C, D> dtoCreation) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ return updateComponent(user, revision, authorizable, daoUpdate, dtoCreation);
+ }
+
+ private <D, C> RevisionUpdate<D> updateComponent(final NiFiUser user, final Revision revision, final Authorizable authorizable, final Supplier<C> daoUpdate, final Function<C, D> dtoCreation) {
try {
final RevisionUpdate<D> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(revision), user, new UpdateRevisionTask<D>() {
@Override
@@ -774,6 +792,81 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
+ public Set<AffectedComponentDTO> getComponentsAffectedByVariableRegistryUpdate(final VariableRegistryDTO variableRegistryDto) {
+ final ProcessGroup group = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId());
+ if (group == null) {
+ throw new ResourceNotFoundException("Could not find Process Group with ID " + variableRegistryDto.getProcessGroupId());
+ }
+
+ final Map<String, String> variableMap = new HashMap<>();
+ variableRegistryDto.getVariables().stream() // have to use forEach here instead of using Collectors.toMap because value may be null
+ .map(VariableEntity::getVariable)
+ .forEach(var -> variableMap.put(var.getName(), var.getValue()));
+
+ final Set<AffectedComponentDTO> affectedComponentDtos = new HashSet<>();
+
+ final Set<String> updatedVariableNames = getUpdatedVariables(group, variableMap);
+ for (final String variableName : updatedVariableNames) {
+ final Set<ConfiguredComponent> affectedComponents = group.getComponentsAffectedByVariable(variableName);
+
+ for (final ConfiguredComponent component : affectedComponents) {
+ if (component instanceof ProcessorNode) {
+ final ProcessorNode procNode = (ProcessorNode) component;
+ if (procNode.isRunning()) {
+ affectedComponentDtos.add(dtoFactory.createAffectedComponentDto(procNode));
+ }
+ } else if (component instanceof ControllerServiceNode) {
+ final ControllerServiceNode serviceNode = (ControllerServiceNode) component;
+ if (serviceNode.isActive()) {
+ affectedComponentDtos.add(dtoFactory.createAffectedComponentDto(serviceNode));
+ }
+ } else {
+ throw new RuntimeException("Found unexpected type of Component [" + component.getCanonicalClassName() + "] dependending on variable");
+ }
+ }
+ }
+
+ return affectedComponentDtos;
+ }
+
+ private Set<String> getUpdatedVariables(final ProcessGroup group, final Map<String, String> newVariableValues) {
+ final Set<String> updatedVariableNames = new HashSet<>();
+
+ final ComponentVariableRegistry registry = group.getVariableRegistry();
+ for (final Map.Entry<String, String> entry : newVariableValues.entrySet()) {
+ final String varName = entry.getKey();
+ final String newValue = entry.getValue();
+
+ final String curValue = registry.getVariableValue(varName);
+ if (!Objects.equals(newValue, curValue)) {
+ updatedVariableNames.add(varName);
+ }
+ }
+
+ return updatedVariableNames;
+ }
+
+
+ @Override
+ public VariableRegistryEntity updateVariableRegistry(Revision revision, VariableRegistryDTO variableRegistryDto) {
+ return updateVariableRegistry(NiFiUserUtils.getNiFiUser(), revision, variableRegistryDto);
+ }
+
+ @Override
+ public VariableRegistryEntity updateVariableRegistry(NiFiUser user, Revision revision, VariableRegistryDTO variableRegistryDto) {
+ final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId());
+ final RevisionUpdate<VariableRegistryDTO> snapshot = updateComponent(user, revision,
+ processGroupNode,
+ () -> processGroupDAO.updateVariableRegistry(variableRegistryDto),
+ processGroup -> dtoFactory.createVariableRegistryDto(processGroup));
+
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode);
+ final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification());
+ return entityFactory.createVariableRegistryEntity(snapshot.getComponent(), updatedRevision, permissions);
+ }
+
+
+ @Override
public ProcessGroupEntity updateProcessGroup(final Revision revision, final ProcessGroupDTO processGroupDTO) {
final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(processGroupDTO.getId());
final RevisionUpdate<ProcessGroupDTO> snapshot = updateComponent(revision,
@@ -790,14 +883,27 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
+ public void verifyUpdateProcessGroup(ProcessGroupDTO processGroupDTO) {
+ if (processGroupDAO.hasProcessGroup(processGroupDTO.getId())) {
+ processGroupDAO.verifyUpdate(processGroupDTO);
+ }
+ }
+
+ @Override
public ScheduleComponentsEntity scheduleComponents(final String processGroupId, final ScheduledState state, final Map<String, Revision> componentRevisions) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ return scheduleComponents(user, processGroupId, state, componentRevisions);
+ }
+
+ @Override
+ public ScheduleComponentsEntity scheduleComponents(final NiFiUser user, final String processGroupId, final ScheduledState state, final Map<String, Revision> componentRevisions) {
+
final RevisionUpdate<ScheduleComponentsEntity> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(componentRevisions.values()), user, new
UpdateRevisionTask<ScheduleComponentsEntity>() {
@Override
public RevisionUpdate<ScheduleComponentsEntity> update() {
// schedule the components
- processGroupDAO.scheduleComponents(processGroupId, state, componentRevisions.keySet());
+ processGroupDAO.scheduleComponents(processGroupId, state, componentRevisions.keySet());
// update the revisions
final Map<String, Revision> updatedRevisions = new HashMap<>();
@@ -821,6 +927,46 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
+ public ActivateControllerServicesEntity activateControllerServices(final String processGroupId, final ControllerServiceState state, final Map<String, Revision> serviceRevisions) {
+
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ return activateControllerServices(user, processGroupId, state, serviceRevisions);
+ }
+
+ @Override
+ public ActivateControllerServicesEntity activateControllerServices(final NiFiUser user, final String processGroupId, final ControllerServiceState state,
+ final Map<String, Revision> serviceRevisions) {
+
+ final RevisionUpdate<ActivateControllerServicesEntity> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(serviceRevisions.values()), user,
+ new UpdateRevisionTask<ActivateControllerServicesEntity>() {
+ @Override
+ public RevisionUpdate<ActivateControllerServicesEntity> update() {
+ // schedule the components
+ processGroupDAO.activateControllerServices(processGroupId, state, serviceRevisions.keySet());
+
+ // update the revisions
+ final Map<String, Revision> updatedRevisions = new HashMap<>();
+ for (final Revision revision : serviceRevisions.values()) {
+ final Revision currentRevision = revisionManager.getRevision(revision.getComponentId());
+ updatedRevisions.put(revision.getComponentId(), currentRevision.incrementRevision(revision.getClientId()));
+ }
+
+ // save
+ controllerFacade.save();
+
+ // gather details for response
+ final ActivateControllerServicesEntity entity = new ActivateControllerServicesEntity();
+ entity.setId(processGroupId);
+ entity.setState(state.name());
+ return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values()));
+ }
+ });
+
+ return updatedComponent.getComponent();
+ }
+
+
+ @Override
public ControllerConfigurationEntity updateControllerConfiguration(final Revision revision, final ControllerConfigurationDTO controllerConfigurationDTO) {
final RevisionUpdate<ControllerConfigurationDTO> updatedComponent = updateComponent(
revision,
@@ -3062,7 +3208,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return createProcessGroupEntity(processGroup);
}
- private ControllerServiceEntity createControllerServiceEntity(final ControllerServiceNode serviceNode, final Set<String> serviceIds) {
+ private ControllerServiceEntity createControllerServiceEntity(final ControllerServiceNode serviceNode, final Set<String> serviceIds, final NiFiUser user) {
final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(serviceNode);
final ControllerServiceReference ref = serviceNode.getReferences();
@@ -3070,26 +3216,77 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
dto.setReferencingComponents(referencingComponentsEntity.getControllerServiceReferencingComponents());
final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(serviceNode.getIdentifier()));
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(serviceNode);
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(serviceNode, user);
final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(serviceNode.getIdentifier()));
final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
return entityFactory.createControllerServiceEntity(dto, revision, permissions, bulletinEntities);
}
@Override
- public Set<ControllerServiceEntity> getControllerServices(final String groupId) {
- final Set<ControllerServiceNode> serviceNodes = controllerServiceDAO.getControllerServices(groupId);
+ public VariableRegistryEntity getVariableRegistry(final String groupId, final boolean includeAncestorGroups) {
+ final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
+ if (processGroup == null) {
+ throw new ResourceNotFoundException("Could not find group with ID " + groupId);
+ }
+
+ return createVariableRegistryEntity(processGroup, includeAncestorGroups);
+ }
+
+ private VariableRegistryEntity createVariableRegistryEntity(final ProcessGroup processGroup, final boolean includeAncestorGroups) {
+ final VariableRegistryDTO registryDto = dtoFactory.createVariableRegistryDto(processGroup);
+ final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processGroup.getIdentifier()));
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
+
+ if (includeAncestorGroups) {
+ ProcessGroup parent = processGroup.getParent();
+ while (parent != null) {
+ final PermissionsDTO parentPerms = dtoFactory.createPermissionsDto(processGroup);
+ if (Boolean.TRUE.equals(parentPerms.getCanRead())) {
+ final VariableRegistryDTO parentRegistryDto = dtoFactory.createVariableRegistryDto(parent);
+ final Set<VariableEntity> parentVariables = parentRegistryDto.getVariables();
+ registryDto.getVariables().addAll(parentVariables);
+ }
+
+ parent = parent.getParent();
+ }
+ }
+
+ return entityFactory.createVariableRegistryEntity(registryDto, revision, permissions);
+ }
+
+ @Override
+ public VariableRegistryEntity populateAffectedComponents(final VariableRegistryDTO variableRegistryDto) {
+ final String groupId = variableRegistryDto.getProcessGroupId();
+ final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
+ if (processGroup == null) {
+ throw new ResourceNotFoundException("Could not find group with ID " + groupId);
+ }
+
+ final VariableRegistryDTO registryDto = dtoFactory.populateAffectedComponents(variableRegistryDto, processGroup);
+ final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processGroup.getIdentifier()));
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
+ return entityFactory.createVariableRegistryEntity(registryDto, revision, permissions);
+ }
+
+ @Override
+ public Set<ControllerServiceEntity> getControllerServices(final String groupId, final boolean includeAncestorGroups, final boolean includeDescendantGroups) {
+ return getControllerServices(groupId, includeAncestorGroups, includeDescendantGroups, NiFiUserUtils.getNiFiUser());
+ }
+
+ @Override
+ public Set<ControllerServiceEntity> getControllerServices(final String groupId, final boolean includeAncestorGroups, final boolean includeDescendantGroups, final NiFiUser user) {
+ final Set<ControllerServiceNode> serviceNodes = controllerServiceDAO.getControllerServices(groupId, includeAncestorGroups, includeDescendantGroups);
final Set<String> serviceIds = serviceNodes.stream().map(service -> service.getIdentifier()).collect(Collectors.toSet());
return serviceNodes.stream()
- .map(serviceNode -> createControllerServiceEntity(serviceNode, serviceIds))
+ .map(serviceNode -> createControllerServiceEntity(serviceNode, serviceIds, user))
.collect(Collectors.toSet());
}
@Override
public ControllerServiceEntity getControllerService(final String controllerServiceId) {
final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId);
- return createControllerServiceEntity(controllerService, Sets.newHashSet(controllerServiceId));
+ return createControllerServiceEntity(controllerService, Sets.newHashSet(controllerServiceId), NiFiUserUtils.getNiFiUser());
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
index 3d78741..1a50d04 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
@@ -608,6 +608,11 @@ public abstract class ApplicationResource {
serviceFacade.authorizeAccess(authorizer);
serviceFacade.verifyRevision(revision, user);
+ // verify if necessary
+ if (verifier != null) {
+ verifier.run();
+ }
+
return action.apply(revision, entity);
}
}
@@ -657,6 +662,11 @@ public abstract class ApplicationResource {
serviceFacade.authorizeAccess(authorizer);
serviceFacade.verifyRevisions(revisions, user);
+ // verify if necessary
+ if (verifier != null) {
+ verifier.run();
+ }
+
return action.apply(revisions, entity);
}
}
@@ -820,16 +830,16 @@ public abstract class ApplicationResource {
}
}
- /**
- * Replicates the request to the given node
- *
- * @param method the HTTP method
- * @param nodeUuid the UUID of the node to replicate the request to
- * @return the response from the node
- * @throws UnknownNodeException if the nodeUuid given does not map to any node in the cluster
- */
- protected Response replicate(final String method, final String nodeUuid) {
- return replicate(method, getRequestParameters(), nodeUuid);
+
+ private void ensureFlowInitialized() {
+ if (!flowController.isInitialized()) {
+ throw new IllegalClusterStateException("Cluster is still in the process of voting on the appropriate Data Flow.");
+ }
+ }
+
+ protected Response replicate(final String method, final Object entity, final String nodeUuid, final Map<String, String> headersToOverride) {
+ final URI path = getAbsolutePath();
+ return replicate(path, method, entity, nodeUuid, headersToOverride);
}
/**
@@ -845,22 +855,16 @@ public abstract class ApplicationResource {
return replicate(method, entity, nodeUuid, null);
}
- private void ensureFlowInitialized() {
- if (!flowController.isInitialized()) {
- throw new IllegalClusterStateException("Cluster is still in the process of voting on the appropriate Data Flow.");
- }
- }
-
/**
* Replicates the request to the given node
*
- * @param method the HTTP method
- * @param entity the Entity to replicate
+ * @param method the HTTP method
+ * @param entity the Entity to replicate
* @param nodeUuid the UUID of the node to replicate the request to
* @return the response from the node
* @throws UnknownNodeException if the nodeUuid given does not map to any node in the cluster
*/
- protected Response replicate(final String method, final Object entity, final String nodeUuid, final Map<String, String> headersToOverride) {
+ protected Response replicate(final URI path, final String method, final Object entity, final String nodeUuid, final Map<String, String> headersToOverride) {
// since we're cluster we must specify the cluster node identifier
if (nodeUuid == null) {
throw new IllegalArgumentException("The cluster node identifier must be specified.");
@@ -873,7 +877,6 @@ public abstract class ApplicationResource {
ensureFlowInitialized();
- final URI path = getAbsolutePath();
try {
final Map<String, String> headers = headersToOverride == null ? getHeaders() : getHeaders(headersToOverride);
@@ -996,6 +999,12 @@ public abstract class ApplicationResource {
}
}
+
+ protected NodeResponse replicateNodeResponse(final String method, final Object entity, final Map<String, String> headersToOverride) throws InterruptedException {
+ final URI path = getAbsolutePath();
+ return replicateNodeResponse(path, method, entity, headersToOverride);
+ }
+
/**
* Replicates the request to all nodes in the cluster using the provided method and entity. The headers
* used will be those provided by the {@link #getHeaders()} method. The URI that will be used will be
@@ -1009,10 +1018,9 @@ public abstract class ApplicationResource {
* @throws InterruptedException if interrupted while replicating the request
* @see #replicate(String, Object, Map)
*/
- protected NodeResponse replicateNodeResponse(final String method, final Object entity, final Map<String, String> headersToOverride) throws InterruptedException {
+ protected NodeResponse replicateNodeResponse(final URI path, final String method, final Object entity, final Map<String, String> headersToOverride) throws InterruptedException {
ensureFlowInitialized();
- final URI path = getAbsolutePath();
final Map<String, String> headers = headersToOverride == null ? getHeaders() : getHeaders(headersToOverride);
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
index 889676c..5d5a796 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
@@ -36,6 +36,8 @@ import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.nar.NarClassLoaders;
import org.apache.nifi.util.NiFiProperties;
@@ -61,6 +63,7 @@ import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
import org.apache.nifi.web.api.entity.AboutEntity;
import org.apache.nifi.web.api.entity.ActionEntity;
+import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
import org.apache.nifi.web.api.entity.BannerEntity;
import org.apache.nifi.web.api.entity.BulletinBoardEntity;
import org.apache.nifi.web.api.entity.ClusteSummaryEntity;
@@ -118,6 +121,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
@@ -388,7 +392,7 @@ public class FlowResource extends ApplicationResource {
}
// get all the controller services
- final Set<ControllerServiceEntity> controllerServices = serviceFacade.getControllerServices(null);
+ final Set<ControllerServiceEntity> controllerServices = serviceFacade.getControllerServices(null, false, false);
controllerServiceResource.populateRemainingControllerServiceEntitiesContent(controllerServices);
// create the response entity
@@ -426,11 +430,10 @@ public class FlowResource extends ApplicationResource {
}
)
public Response getControllerServicesFromGroup(
- @ApiParam(
- value = "The process group id.",
- required = true
- )
- @PathParam("id") String groupId) throws InterruptedException {
+ @ApiParam(value = "The process group id.", required = true) @PathParam("id") String groupId,
+ @ApiParam("Whether or not to include parent/ancestory process groups") @QueryParam("includeAncestorGroups") @DefaultValue("true") boolean includeAncestorGroups,
+ @ApiParam("Whether or not to include descendant process groups") @QueryParam("includeDescendantGroups") @DefaultValue("false") boolean includeDescendantGroups
+ ) throws InterruptedException {
authorizeFlow();
@@ -439,7 +442,7 @@ public class FlowResource extends ApplicationResource {
}
// get all the controller services
- final Set<ControllerServiceEntity> controllerServices = serviceFacade.getControllerServices(groupId);
+ final Set<ControllerServiceEntity> controllerServices = serviceFacade.getControllerServices(groupId, includeAncestorGroups, includeDescendantGroups);
controllerServiceResource.populateRemainingControllerServiceEntitiesContent(controllerServices);
// create the response entity
@@ -512,7 +515,7 @@ public class FlowResource extends ApplicationResource {
@Produces(MediaType.APPLICATION_JSON)
@Path("process-groups/{id}")
@ApiOperation(
- value = "Schedule or unschedule comopnents in the specified Process Group.",
+ value = "Schedule or unschedule components in the specified Process Group.",
response = ScheduleComponentsEntity.class,
authorizations = {
@Authorization(value = "Read - /flow", type = ""),
@@ -570,7 +573,7 @@ public class FlowResource extends ApplicationResource {
// ensure authorized for each processor we will attempt to schedule
group.findAllProcessors().stream()
- .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PROCESSORS : ProcessGroup.UNSCHEDULABLE_PROCESSORS)
+ .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PROCESSORS : ProcessGroup.UNSCHEDULABLE_PROCESSORS)
.filter(processor -> processor.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()))
.forEach(processor -> {
componentIds.add(processor.getIdentifier());
@@ -578,7 +581,7 @@ public class FlowResource extends ApplicationResource {
// ensure authorized for each input port we will attempt to schedule
group.findAllInputPorts().stream()
- .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS)
+ .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS)
.filter(inputPort -> inputPort.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()))
.forEach(inputPort -> {
componentIds.add(inputPort.getIdentifier());
@@ -586,7 +589,7 @@ public class FlowResource extends ApplicationResource {
// ensure authorized for each output port we will attempt to schedule
group.findAllOutputPorts().stream()
- .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS)
+ .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS)
.filter(outputPort -> outputPort.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()))
.forEach(outputPort -> {
componentIds.add(outputPort.getIdentifier());
@@ -640,7 +643,129 @@ public class FlowResource extends ApplicationResource {
componentsToSchedule.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> getRevision(e.getValue(), e.getKey())));
// update the process group
- final ScheduleComponentsEntity entity = serviceFacade.scheduleComponents(id, scheduledState, componentRevisions);
+ final ScheduleComponentsEntity entity = serviceFacade.scheduleComponents(id, scheduledState, componentRevisions);
+ return generateOkResponse(entity).build();
+ }
+ );
+ }
+
+
+ @PUT
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("process-groups/{id}/controller-services")
+ @ApiOperation(value = "Enable or disable Controller Services in the specified Process Group.",
+ response = ActivateControllerServicesEntity.class,
+ authorizations = {
+ @Authorization(value = "Read - /flow", type = ""),
+ @Authorization(value = "Write - /{component-type}/{uuid} - For every service being enabled/disabled", type = "")
+ })
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ }
+ )
+ public Response activateControllerServices(
+ @Context HttpServletRequest httpServletRequest,
+ @ApiParam(value = "The process group id.", required = true)
+ @PathParam("id") String id,
+ @ApiParam(value = "The request to schedule or unschedule. If the comopnents in the request are not specified, all authorized components will be considered.", required = true)
+ final ActivateControllerServicesEntity requestEntity) {
+
+ // ensure the same id is being used
+ if (!id.equals(requestEntity.getId())) {
+ throw new IllegalArgumentException(String.format("The process group id (%s) in the request body does "
+ + "not equal the process group id of the requested resource (%s).", requestEntity.getId(), id));
+ }
+
+ final ControllerServiceState state;
+ if (requestEntity.getState() == null) {
+ throw new IllegalArgumentException("The controller service state must be specified.");
+ } else {
+ try {
+ state = ControllerServiceState.valueOf(requestEntity.getState());
+ } catch (final IllegalArgumentException iae) {
+ throw new IllegalArgumentException(String.format("The controller service state must be one of [%s].",
+ StringUtils.join(EnumSet.of(ControllerServiceState.ENABLED, ControllerServiceState.DISABLED), ", ")));
+ }
+ }
+
+ // ensure its a supported scheduled state
+ if (ControllerServiceState.DISABLING.equals(state) || ControllerServiceState.ENABLING.equals(state)) {
+ throw new IllegalArgumentException(String.format("The scheduled must be one of [%s].",
+ StringUtils.join(EnumSet.of(ControllerServiceState.ENABLED, ControllerServiceState.DISABLED), ", ")));
+ }
+
+ // if the components are not specified, gather all components and their current revision
+ if (requestEntity.getComponents() == null) {
+ // get the current revisions for the components being updated
+ final Set<Revision> revisions = serviceFacade.getRevisionsFromGroup(id, group -> {
+ final Set<String> componentIds = new HashSet<>();
+
+ final Predicate<ControllerServiceNode> filter;
+ if (ControllerServiceState.ENABLED.equals(state)) {
+ filter = service -> !service.isActive() && service.isValid();
+ } else {
+ filter = service -> service.isActive();
+ }
+
+ group.findAllControllerServices().stream()
+ .filter(filter)
+ .filter(service -> service.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()))
+ .forEach(service -> componentIds.add(service.getIdentifier()));
+ return componentIds;
+ });
+
+ // build the component mapping
+ final Map<String, RevisionDTO> componentsToSchedule = new HashMap<>();
+ revisions.forEach(revision -> {
+ final RevisionDTO dto = new RevisionDTO();
+ dto.setClientId(revision.getClientId());
+ dto.setVersion(revision.getVersion());
+ componentsToSchedule.put(revision.getComponentId(), dto);
+ });
+
+ // set the components and their current revision
+ requestEntity.setComponents(componentsToSchedule);
+ }
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.PUT, requestEntity);
+ }
+
+ final Map<String, RevisionDTO> requestComponentsToSchedule = requestEntity.getComponents();
+ final Map<String, Revision> requestComponentRevisions =
+ requestComponentsToSchedule.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> getRevision(e.getValue(), e.getKey())));
+ final Set<Revision> requestRevisions = new HashSet<>(requestComponentRevisions.values());
+
+ return withWriteLock(
+ serviceFacade,
+ requestEntity,
+ requestRevisions,
+ lookup -> {
+ // ensure access to the flow
+ authorizeFlow();
+
+ // ensure access to every component being scheduled
+ requestComponentsToSchedule.keySet().forEach(componentId -> {
+ final Authorizable authorizable = lookup.getControllerService(componentId).getAuthorizable();
+ authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
+ });
+ },
+ () -> serviceFacade.verifyActivateControllerServices(id, state, requestComponentRevisions.keySet()),
+ (revisions, scheduleComponentsEntity) -> {
+ final ControllerServiceState serviceState = ControllerServiceState.valueOf(scheduleComponentsEntity.getState());
+
+ final Map<String, RevisionDTO> componentsToSchedule = scheduleComponentsEntity.getComponents();
+ final Map<String, Revision> componentRevisions =
+ componentsToSchedule.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> getRevision(e.getValue(), e.getKey())));
+
+ // update the controller services
+ final ActivateControllerServicesEntity entity = serviceFacade.activateControllerServices(id, serviceState, componentRevisions);
return generateOkResponse(entity).build();
}
);