You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/04/25 18:58:47 UTC
nifi git commit: NIFI-5066: - Allowing the enable/disable buttons to
be active under the same conditions as the start/stop buttons.
Repository: nifi
Updated Branches:
refs/heads/master 72f8999b1 -> 6938e58c8
NIFI-5066:
- Allowing the enable/disable buttons to be active under the same conditions as the start/stop buttons.
This closes #2633.
Signed-off-by: Mark Payne <ma...@hotmail.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6938e58c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6938e58c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6938e58c
Branch: refs/heads/master
Commit: 6938e58c81d857f5d56a694a6fef8cf94e60bce2
Parents: 72f8999
Author: Matt Gilman <ma...@gmail.com>
Authored: Thu Apr 12 14:12:01 2018 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Apr 25 14:58:29 2018 -0400
----------------------------------------------------------------------
.../api/entity/ScheduleComponentsEntity.java | 6 +-
.../org/apache/nifi/groups/ProcessGroup.java | 36 +++-
.../nifi/groups/StandardProcessGroup.java | 12 +-
.../apache/nifi/audit/ProcessGroupAuditor.java | 25 +++
.../org/apache/nifi/web/NiFiServiceFacade.java | 2 +
.../nifi/web/StandardNiFiServiceFacade.java | 32 ++++
.../org/apache/nifi/web/api/FlowResource.java | 75 +++++++--
.../apache/nifi/web/dao/ProcessGroupDAO.java | 10 ++
.../web/dao/impl/StandardProcessGroupDAO.java | 36 +++-
.../WEB-INF/partials/canvas/navigation.jsp | 4 +-
.../src/main/webapp/js/nf/canvas/nf-actions.js | 164 +++++++++++--------
.../main/webapp/js/nf/canvas/nf-canvas-utils.js | 40 +++--
12 files changed, 329 insertions(+), 113 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/6938e58c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ScheduleComponentsEntity.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ScheduleComponentsEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ScheduleComponentsEntity.java
index 280d015..dff1ea7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ScheduleComponentsEntity.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ScheduleComponentsEntity.java
@@ -29,6 +29,8 @@ import java.util.Map;
public class ScheduleComponentsEntity extends Entity {
public static final String STATE_RUNNING = "RUNNING";
public static final String STATE_STOPPED = "STOPPED";
+ public static final String STATE_ENABLED = "ENABLED";
+ public static final String STATE_DISABLED = "DISABLED";
private String id;
private String state;
@@ -49,11 +51,11 @@ public class ScheduleComponentsEntity extends Entity {
}
/**
- * @return The desired state of the descendant components. Possible states are 'RUNNING' and 'STOPPED'
+ * @return The desired state of the descendant components. Possible states are 'RUNNING', 'STOPPED', 'ENABLED', and 'DISABLED'
*/
@ApiModelProperty(
value = "The desired state of the descendant components",
- allowableValues = STATE_RUNNING + ", " + STATE_STOPPED
+ allowableValues = STATE_RUNNING + ", " + STATE_STOPPED + ", " + STATE_ENABLED + ", " + STATE_DISABLED
)
public String getState() {
return state;
http://git-wip-us.apache.org/repos/asf/nifi/blob/6938e58c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
index d7a9e8b..da9374a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
@@ -57,24 +57,44 @@ import org.apache.nifi.remote.RemoteGroupPort;
public interface ProcessGroup extends ComponentAuthorizable, Positionable, VersionedComponent {
/**
- * Predicate for filtering schedulable Processors.
+ * Predicate for starting eligible Processors.
*/
- Predicate<ProcessorNode> SCHEDULABLE_PROCESSORS = node -> !node.isRunning() && !ScheduledState.DISABLED.equals(node.getScheduledState()) && node.isValid();
+ Predicate<ProcessorNode> START_PROCESSORS_FILTER = node -> !node.isRunning() && !ScheduledState.DISABLED.equals(node.getScheduledState()) && node.isValid();
/**
- * Predicate for filtering unschedulable Processors.
+ * Predicate for stopping eligible Processors.
*/
- Predicate<ProcessorNode> UNSCHEDULABLE_PROCESSORS = node -> node.isRunning();
+ Predicate<ProcessorNode> STOP_PROCESSORS_FILTER = node -> node.isRunning();
/**
- * Predicate for filtering schedulable Ports
+ * Predicate for enabling eligible Processors.
*/
- Predicate<Port> SCHEDULABLE_PORTS = port -> !port.isRunning() && !ScheduledState.DISABLED.equals(port.getScheduledState()) && port.isValid();
+ Predicate<ProcessorNode> ENABLE_PROCESSORS_FILTER = node -> ScheduledState.DISABLED.equals(node.getScheduledState());
/**
- * Predicate for filtering schedulable Ports
+ * Predicate for disabling eligible Processors.
*/
- Predicate<Port> UNSCHEDULABLE_PORTS = port -> ScheduledState.RUNNING.equals(port.getScheduledState());
+ Predicate<ProcessorNode> DISABLE_PROCESSORS_FILTER = node -> !node.isRunning();
+
+ /**
+ * Predicate for starting eligible Ports.
+ */
+ Predicate<Port> START_PORTS_FILTER = port -> !port.isRunning() && !ScheduledState.DISABLED.equals(port.getScheduledState()) && port.isValid();
+
+ /**
+ * Predicate for stopping eligible Ports.
+ */
+ Predicate<Port> STOP_PORTS_FILTER = port -> ScheduledState.RUNNING.equals(port.getScheduledState());
+
+ /**
+ * Predicate for enabling eligible Processors.
+ */
+ Predicate<Port> ENABLE_PORTS_FILTER = port -> ScheduledState.DISABLED.equals(port.getScheduledState());
+
+ /**
+ * Predicate for disabling eligible Ports.
+ */
+ Predicate<Port> DISABLE_PORTS_FILTER = port -> !port.isRunning();
/**
* @return a reference to this ProcessGroup's parent. This will be
http://git-wip-us.apache.org/repos/asf/nifi/blob/6938e58c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index d2167f4..94196d0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -412,7 +412,7 @@ public final class StandardProcessGroup implements ProcessGroup {
public void startProcessing() {
readLock.lock();
try {
- findAllProcessors().stream().filter(SCHEDULABLE_PROCESSORS).forEach(node -> {
+ findAllProcessors().stream().filter(START_PROCESSORS_FILTER).forEach(node -> {
try {
node.getProcessGroup().startProcessor(node, true);
} catch (final Throwable t) {
@@ -420,11 +420,11 @@ public final class StandardProcessGroup implements ProcessGroup {
}
});
- findAllInputPorts().stream().filter(SCHEDULABLE_PORTS).forEach(port -> {
+ findAllInputPorts().stream().filter(START_PORTS_FILTER).forEach(port -> {
port.getProcessGroup().startInputPort(port);
});
- findAllOutputPorts().stream().filter(SCHEDULABLE_PORTS).forEach(port -> {
+ findAllOutputPorts().stream().filter(START_PORTS_FILTER).forEach(port -> {
port.getProcessGroup().startOutputPort(port);
});
} finally {
@@ -436,7 +436,7 @@ public final class StandardProcessGroup implements ProcessGroup {
public void stopProcessing() {
readLock.lock();
try {
- findAllProcessors().stream().filter(UNSCHEDULABLE_PROCESSORS).forEach(node -> {
+ findAllProcessors().stream().filter(STOP_PROCESSORS_FILTER).forEach(node -> {
try {
node.getProcessGroup().stopProcessor(node);
} catch (final Throwable t) {
@@ -444,11 +444,11 @@ public final class StandardProcessGroup implements ProcessGroup {
}
});
- findAllInputPorts().stream().filter(UNSCHEDULABLE_PORTS).forEach(port -> {
+ findAllInputPorts().stream().filter(STOP_PORTS_FILTER).forEach(port -> {
port.getProcessGroup().stopInputPort(port);
});
- findAllOutputPorts().stream().filter(UNSCHEDULABLE_PORTS).forEach(port -> {
+ findAllOutputPorts().stream().filter(STOP_PORTS_FILTER).forEach(port -> {
port.getProcessGroup().stopOutputPort(port);
});
} finally {
http://git-wip-us.apache.org/repos/asf/nifi/blob/6938e58c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java
index 3ba5bea..3d92f5b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java
@@ -197,6 +197,31 @@ public class ProcessGroupAuditor extends NiFiAuditor {
return result;
}
+ /**
+ * Audits the update of process group configuration.
+ *
+ * @param proceedingJoinPoint join point
+ * @param groupId group id
+ * @param state scheduled state
+ * @throws Throwable ex
+ */
+ @Around("within(org.apache.nifi.web.dao.ProcessGroupDAO+) && "
+ + "execution(void enableComponents(String, org.apache.nifi.controller.ScheduledState, java.util.Set<String>)) && "
+ + "args(groupId, state, componentIds)")
+ public void enableComponentsAdvice(ProceedingJoinPoint proceedingJoinPoint, String groupId, ScheduledState state, Set<String> componentIds) throws Throwable {
+ final Operation operation;
+
+ proceedingJoinPoint.proceed();
+
+ // determine the running state
+ if (ScheduledState.DISABLED.equals(state)) {
+ operation = Operation.Disable;
+ } else {
+ operation = Operation.Enable;
+ }
+
+ saveUpdateAction(groupId, operation);
+ }
/**
* Audits the update of controller serivce state
http://git-wip-us.apache.org/repos/asf/nifi/blob/6938e58c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index ba813ff..e402e1c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -1028,6 +1028,8 @@ public interface NiFiServiceFacade {
*/
ActivateControllerServicesEntity activateControllerServices(String processGroupId, ControllerServiceState state, Map<String, Revision> serviceRevisions);
+ ScheduleComponentsEntity enableComponents(String processGroupId, ScheduledState state, Map<String, Revision> componentRevisions);
+
/**
* Schedules all applicable components under the specified ProcessGroup on behalf of the currently logged in user.
*
http://git-wip-us.apache.org/repos/asf/nifi/blob/6938e58c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index ff52883..241c83d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -969,6 +969,38 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
+ public ScheduleComponentsEntity enableComponents(String processGroupId, ScheduledState state, Map<String, Revision> componentRevisions) {
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+ final RevisionUpdate<ScheduleComponentsEntity> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(componentRevisions.values()), user, new
+ UpdateRevisionTask<ScheduleComponentsEntity>() {
+ @Override
+ public RevisionUpdate<ScheduleComponentsEntity> update() {
+ // schedule the components
+ processGroupDAO.enableComponents(processGroupId, state, componentRevisions.keySet());
+
+ // update the revisions
+ final Map<String, Revision> updatedRevisions = new HashMap<>();
+ for (final Revision revision : componentRevisions.values()) {
+ final Revision currentRevision = revisionManager.getRevision(revision.getComponentId());
+ updatedRevisions.put(revision.getComponentId(), currentRevision.incrementRevision(revision.getClientId()));
+ }
+
+ // save
+ controllerFacade.save();
+
+ // gather details for response
+ final ScheduleComponentsEntity entity = new ScheduleComponentsEntity();
+ entity.setId(processGroupId);
+ entity.setState(state.name());
+ return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values()));
+ }
+ });
+
+ return updatedComponent.getComponent();
+ }
+
+ @Override
public ScheduleComponentsEntity scheduleComponents(final String processGroupId, final ScheduledState state, final Map<String, Revision> componentRevisions) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
final RevisionUpdate<ScheduleComponentsEntity> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(componentRevisions.values()), user, new
http://git-wip-us.apache.org/repos/asf/nifi/blob/6938e58c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
index 60b811e..0382e8a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
@@ -34,6 +34,8 @@ import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceState;
@@ -131,7 +133,12 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.nifi.web.api.entity.ScheduleComponentsEntity.STATE_DISABLED;
+import static org.apache.nifi.web.api.entity.ScheduleComponentsEntity.STATE_ENABLED;
/**
* RESTful endpoint for managing a Flow.
@@ -563,27 +570,61 @@ public class FlowResource extends ApplicationResource {
if (requestScheduleComponentsEntity.getState() == null) {
throw new IllegalArgumentException("The scheduled state must be specified.");
} else {
- try {
- state = ScheduledState.valueOf(requestScheduleComponentsEntity.getState());
- } catch (final IllegalArgumentException iae) {
- throw new IllegalArgumentException(String.format("The scheduled must be one of [%s].", StringUtils.join(EnumSet.of(ScheduledState.RUNNING, ScheduledState.STOPPED), ", ")));
+ if (requestScheduleComponentsEntity.getState().equals(STATE_ENABLED)) {
+ state = ScheduledState.STOPPED;
+ } else {
+ try {
+ state = ScheduledState.valueOf(requestScheduleComponentsEntity.getState());
+ } catch (final IllegalArgumentException iae) {
+ throw new IllegalArgumentException(String.format("The scheduled must be one of [%s].",
+ StringUtils.join(Stream.of(ScheduledState.RUNNING, ScheduledState.STOPPED, STATE_ENABLED, ScheduledState.DISABLED), ", ")));
+ }
}
}
// ensure its a supported scheduled state
- if (ScheduledState.DISABLED.equals(state) || ScheduledState.STARTING.equals(state) || ScheduledState.STOPPING.equals(state)) {
- throw new IllegalArgumentException(String.format("The scheduled must be one of [%s].", StringUtils.join(EnumSet.of(ScheduledState.RUNNING, ScheduledState.STOPPED), ", ")));
+ if (ScheduledState.STARTING.equals(state) || ScheduledState.STOPPING.equals(state)) {
+ throw new IllegalArgumentException(String.format("The scheduled must be one of [%s].",
+ StringUtils.join(Stream.of(ScheduledState.RUNNING, ScheduledState.STOPPED, STATE_ENABLED, ScheduledState.DISABLED), ", ")));
}
// if the components are not specified, gather all components and their current revision
if (requestScheduleComponentsEntity.getComponents() == null) {
+ final Supplier<Predicate<ProcessorNode>> getProcessorFilter = () -> {
+ if (ScheduledState.RUNNING.equals(state)) {
+ return ProcessGroup.START_PROCESSORS_FILTER;
+ } else if (ScheduledState.STOPPED.equals(state)) {
+ if (requestScheduleComponentsEntity.getState().equals(STATE_ENABLED)) {
+ return ProcessGroup.ENABLE_PROCESSORS_FILTER;
+ } else {
+ return ProcessGroup.STOP_PROCESSORS_FILTER;
+ }
+ } else {
+ return ProcessGroup.DISABLE_PROCESSORS_FILTER;
+ }
+ };
+
+ final Supplier<Predicate<Port>> getPortFilter = () -> {
+ if (ScheduledState.RUNNING.equals(state)) {
+ return ProcessGroup.START_PORTS_FILTER;
+ } else if (ScheduledState.STOPPED.equals(state)) {
+ if (requestScheduleComponentsEntity.getState().equals(STATE_ENABLED)) {
+ return ProcessGroup.ENABLE_PORTS_FILTER;
+ } else {
+ return ProcessGroup.STOP_PORTS_FILTER;
+ }
+ } else {
+ return ProcessGroup.DISABLE_PORTS_FILTER;
+ }
+ };
+
// get the current revisions for the components being updated
final Set<Revision> revisions = serviceFacade.getRevisionsFromGroup(id, group -> {
final Set<String> componentIds = new HashSet<>();
// ensure authorized for each processor we will attempt to schedule
group.findAllProcessors().stream()
- .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PROCESSORS : ProcessGroup.UNSCHEDULABLE_PROCESSORS)
+ .filter(getProcessorFilter.get())
.filter(processor -> processor.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()))
.forEach(processor -> {
componentIds.add(processor.getIdentifier());
@@ -591,7 +632,7 @@ public class FlowResource extends ApplicationResource {
// ensure authorized for each input port we will attempt to schedule
group.findAllInputPorts().stream()
- .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS)
+ .filter(getPortFilter.get())
.filter(inputPort -> inputPort.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()))
.forEach(inputPort -> {
componentIds.add(inputPort.getIdentifier());
@@ -599,7 +640,7 @@ public class FlowResource extends ApplicationResource {
// ensure authorized for each output port we will attempt to schedule
group.findAllOutputPorts().stream()
- .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS)
+ .filter(getPortFilter.get())
.filter(outputPort -> outputPort.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()))
.forEach(outputPort -> {
componentIds.add(outputPort.getIdentifier());
@@ -646,14 +687,26 @@ public class FlowResource extends ApplicationResource {
},
() -> serviceFacade.verifyScheduleComponents(id, state, requestComponentRevisions.keySet()),
(revisions, scheduleComponentsEntity) -> {
- final ScheduledState scheduledState = ScheduledState.valueOf(scheduleComponentsEntity.getState());
+
+ final ScheduledState scheduledState;
+ if (STATE_ENABLED.equals(scheduleComponentsEntity.getState())) {
+ scheduledState = ScheduledState.STOPPED;
+ } else {
+ scheduledState = ScheduledState.valueOf(scheduleComponentsEntity.getState());
+ }
final Map<String, RevisionDTO> componentsToSchedule = scheduleComponentsEntity.getComponents();
final Map<String, Revision> componentRevisions =
componentsToSchedule.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> getRevision(e.getValue(), e.getKey())));
// update the process group
- final ScheduleComponentsEntity entity = serviceFacade.scheduleComponents(id, scheduledState, componentRevisions);
+ final ScheduleComponentsEntity entity;
+ if (STATE_ENABLED.equals(scheduleComponentsEntity.getState()) || STATE_DISABLED.equals(scheduleComponentsEntity.getState())) {
+ entity = serviceFacade.enableComponents(id, scheduledState, componentRevisions);
+ } else {
+ entity = serviceFacade.scheduleComponents(id, scheduledState, componentRevisions);
+ }
+
return generateOkResponse(entity).build();
}
);
http://git-wip-us.apache.org/repos/asf/nifi/blob/6938e58c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
index 36fd7dc..5cf7ecb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
@@ -84,12 +84,22 @@ public interface ProcessGroupDAO {
*
* @param groupId id
* @param state scheduled state
+ * @param componentIds components
*
* @return a Future that can be used to wait for the services to finish starting or stopping
*/
Future<Void> scheduleComponents(String groupId, ScheduledState state, Set<String> componentIds);
/**
+ * Enables or disabled the components in the specified process group.
+ *
+ * @param groupId id
+ * @param state scheduled state
+ * @param componentIds components
+ */
+ void enableComponents(String groupId, ScheduledState state, Set<String> componentIds);
+
+ /**
* Enables or disables the controller services in the specified process group
*
* @param groupId id
http://git-wip-us.apache.org/repos/asf/nifi/blob/6938e58c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
index ca47764..265d128 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
@@ -171,7 +171,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
remotePort.getRemoteProcessGroup().startTransmitting(remotePort);
break;
}
- } else {
+ } else if (ScheduledState.STOPPED.equals(state)) {
switch (connectable.getConnectableType()) {
case PROCESSOR:
final CompletableFuture<?> processorFuture = connectable.getProcessGroup().stopProcessor((ProcessorNode) connectable);
@@ -196,6 +196,40 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
}
@Override
+ public void enableComponents(final String groupId, final ScheduledState state, final Set<String> componentIds) {
+ final ProcessGroup group = locateProcessGroup(flowController, groupId);
+
+ for (final String componentId : componentIds) {
+ final Connectable connectable = group.findLocalConnectable(componentId);
+ if (ScheduledState.STOPPED.equals(state)) {
+ switch (connectable.getConnectableType()) {
+ case PROCESSOR:
+ connectable.getProcessGroup().enableProcessor((ProcessorNode) connectable);
+ break;
+ case INPUT_PORT:
+ connectable.getProcessGroup().enableInputPort((Port) connectable);
+ break;
+ case OUTPUT_PORT:
+ connectable.getProcessGroup().enableOutputPort((Port) connectable);
+ break;
+ }
+ } else if (ScheduledState.DISABLED.equals(state)) {
+ switch (connectable.getConnectableType()) {
+ case PROCESSOR:
+ connectable.getProcessGroup().disableProcessor((ProcessorNode) connectable);
+ break;
+ case INPUT_PORT:
+ connectable.getProcessGroup().disableInputPort((Port) connectable);
+ break;
+ case OUTPUT_PORT:
+ connectable.getProcessGroup().disableOutputPort((Port) connectable);
+ break;
+ }
+ }
+ }
+ }
+
+ @Override
public Future<Void> activateControllerServices(final String groupId, final ControllerServiceState state, final Collection<String> serviceIds) {
final List<ControllerServiceNode> serviceNodes = serviceIds.stream()
.map(flowController::getControllerServiceNode)
http://git-wip-us.apache.org/repos/asf/nifi/blob/6938e58c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/navigation.jsp
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/navigation.jsp b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/navigation.jsp
index f122076..67bda8c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/navigation.jsp
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/navigation.jsp
@@ -107,13 +107,13 @@
<div class="button-spacer-large"> </div>
<div id="operate-enable" class="action-button" title="Enable">
<button ng-click="appCtrl.nf.Actions['enable'](appCtrl.nf.CanvasUtils.getSelection());"
- ng-disabled="!appCtrl.nf.CanvasUtils.canEnable(appCtrl.nf.CanvasUtils.getSelection());">
+ ng-disabled="!appCtrl.nf.CanvasUtils.getSelection().empty() && !appCtrl.nf.CanvasUtils.canModify(appCtrl.nf.CanvasUtils.getSelection());">
<div class="graph-control-action-icon fa fa-flash"></div></button>
</div>
<div class="button-spacer-small"> </div>
<div id="operate-disable" class="action-button" title="Disable">
<button ng-click="appCtrl.nf.Actions['disable'](appCtrl.nf.CanvasUtils.getSelection());"
- ng-disabled="!appCtrl.nf.CanvasUtils.canDisable(appCtrl.nf.CanvasUtils.getSelection());">
+ ng-disabled="!appCtrl.nf.CanvasUtils.getSelection().empty() && !appCtrl.nf.CanvasUtils.canModify(appCtrl.nf.CanvasUtils.getSelection());">
<div class="graph-control-action-icon icon icon-enable-false"></div></button>
</div>
<div class="button-spacer-large"> </div>
http://git-wip-us.apache.org/repos/asf/nifi/blob/6938e58c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
index 2b09d14..41e945b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
@@ -502,39 +502,58 @@
* @argument {selection} selection The selection
*/
enable: function (selection) {
- var componentsToEnable = nfCanvasUtils.filterEnable(selection);
+ if (selection.empty()) {
+ // build the entity
+ var entity = {
+ 'id': nfCanvasUtils.getGroupId(),
+ 'state': 'ENABLED'
+ };
- if (componentsToEnable.empty()) {
- nfDialog.showOkDialog({
- headerText: 'Enable Components',
- dialogContent: 'No eligible components are selected. Please select the components to be enabled and ensure they are no longer running.'
- });
+ updateResource(config.urls.api + '/flow/process-groups/' + encodeURIComponent(nfCanvasUtils.getGroupId()), entity).done(updateProcessGroup);
} else {
- var enableRequests = [];
-
- // enable the selected processors
- componentsToEnable.each(function (d) {
- var selected = d3.select(this);
-
- // build the entity
- var entity = {
- 'revision': nfClient.getRevision(d),
- 'component': {
- 'id': d.id,
- 'state': 'STOPPED'
- }
- };
+ var componentsToEnable = nfCanvasUtils.filterEnable(selection);
- enableRequests.push(updateResource(d.uri, entity).done(function (response) {
- nfCanvasUtils.getComponentByType(d.type).set(response);
- }));
- });
+ if (!componentsToEnable.empty()) {
+ var enableRequests = [];
- // inform Angular app once the updates have completed
- if (enableRequests.length > 0) {
- $.when.apply(window, enableRequests).always(function () {
- nfNgBridge.digest();
+ // enable the selected processors
+ componentsToEnable.each(function (d) {
+ var selected = d3.select(this);
+
+ // prepare the request
+ var uri, entity;
+ if (nfCanvasUtils.isProcessGroup(selected)) {
+ uri = config.urls.api + '/flow/process-groups/' + encodeURIComponent(d.id);
+ entity = {
+ 'id': d.id,
+ 'state': 'ENABLED'
+ }
+ } else {
+ uri = d.uri;
+ entity = {
+ 'revision': nfClient.getRevision(d),
+ 'component': {
+ 'id': d.id,
+ 'state': 'STOPPED'
+ }
+ };
+ }
+
+ enableRequests.push(updateResource(uri, entity).done(function (response) {
+ if (nfCanvasUtils.isProcessGroup(selected)) {
+ nfCanvasUtils.getComponentByType('ProcessGroup').reload(d.id);
+ } else {
+ nfCanvasUtils.getComponentByType(d.type).set(response);
+ }
+ }));
});
+
+ // inform Angular app once the updates have completed
+ if (enableRequests.length > 0) {
+ $.when.apply(window, enableRequests).always(function () {
+ nfNgBridge.digest();
+ });
+ }
}
}
},
@@ -545,39 +564,58 @@
* @argument {selection} selection The selection
*/
disable: function (selection) {
- var componentsToDisable = nfCanvasUtils.filterDisable(selection);
+ if (selection.empty()) {
+ // build the entity
+ var entity = {
+ 'id': nfCanvasUtils.getGroupId(),
+ 'state': 'DISABLED'
+ };
- if (componentsToDisable.empty()) {
- nfDialog.showOkDialog({
- headerText: 'Disable Components',
- dialogContent: 'No eligible components are selected. Please select the components to be disabled and ensure they are no longer running.'
- });
+ updateResource(config.urls.api + '/flow/process-groups/' + encodeURIComponent(nfCanvasUtils.getGroupId()), entity).done(updateProcessGroup);
} else {
- var disableRequests = [];
-
- // disable the selected components
- componentsToDisable.each(function (d) {
- var selected = d3.select(this);
-
- // build the entity
- var entity = {
- 'revision': nfClient.getRevision(d),
- 'component': {
- 'id': d.id,
- 'state': 'DISABLED'
- }
- };
+ var componentsToDisable = nfCanvasUtils.filterDisable(selection);
- disableRequests.push(updateResource(d.uri, entity).done(function (response) {
- nfCanvasUtils.getComponentByType(d.type).set(response);
- }));
- });
+ if (!componentsToDisable.empty()) {
+ var disableRequests = [];
- // inform Angular app once the updates have completed
- if (disableRequests.length > 0) {
- $.when.apply(window, disableRequests).always(function () {
- nfNgBridge.digest();
+ // disable the selected components
+ componentsToDisable.each(function (d) {
+ var selected = d3.select(this);
+
+ // prepare the request
+ var uri, entity;
+ if (nfCanvasUtils.isProcessGroup(selected)) {
+ uri = config.urls.api + '/flow/process-groups/' + encodeURIComponent(d.id);
+ entity = {
+ 'id': d.id,
+ 'state': 'DISABLED'
+ }
+ } else {
+ uri = d.uri;
+ entity = {
+ 'revision': nfClient.getRevision(d),
+ 'component': {
+ 'id': d.id,
+ 'state': 'DISABLED'
+ }
+ };
+ }
+
+ disableRequests.push(updateResource(uri, entity).done(function (response) {
+ if (nfCanvasUtils.isProcessGroup(selected)) {
+ nfCanvasUtils.getComponentByType('ProcessGroup').reload(d.id);
+ } else {
+ nfCanvasUtils.getComponentByType(d.type).set(response);
+ }
+ }));
});
+
+ // inform Angular app once the updates have completed
+ if (disableRequests.length > 0) {
+ $.when.apply(window, disableRequests).always(function () {
+ nfNgBridge.digest();
+ });
+ }
}
}
},
@@ -618,12 +656,7 @@
});
// ensure there are startable components selected
- if (componentsToStart.empty()) {
- nfDialog.showOkDialog({
- headerText: 'Start Components',
- dialogContent: 'No eligible components are selected. Please select the components to be started and ensure they are no longer running.'
- });
- } else {
+ if (!componentsToStart.empty()) {
var startRequests = [];
// start each selected component
@@ -688,12 +721,7 @@
});
// ensure there are some component to stop
- if (componentsToStop.empty()) {
- nfDialog.showOkDialog({
- headerText: 'Stop Components',
- dialogContent: 'No eligible components are selected. Please select the components to be stopped.'
- });
- } else {
+ if (!componentsToStop.empty()) {
var stopRequests = [];
// stop each selected component
http://git-wip-us.apache.org/repos/asf/nifi/blob/6938e58c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js
index 7a8cf65..d6785bb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js
@@ -1141,10 +1141,19 @@
var selected = d3.select(this);
var selectedData = selected.datum();
+ // enable always allowed for PGs since they will invoke the /flow endpoint for enabling all applicable components (based on permissions)
+ if (nfCanvasUtils.isProcessGroup(selected)) {
+ return true;
+ }
+
+ // not a PG, verify permissions to modify
+ if (nfCanvasUtils.canModify(selected) === false) {
+ return false;
+ }
+
// ensure its a processor, input port, or output port and supports modification and is disabled (can enable)
return ((nfCanvasUtils.isProcessor(selected) || nfCanvasUtils.isInputPort(selected) || nfCanvasUtils.isOutputPort(selected)) &&
- nfCanvasUtils.supportsModification(selected) &&
- selectedData.status.aggregateSnapshot.runStatus === 'Disabled');
+ nfCanvasUtils.supportsModification(selected) && selectedData.status.aggregateSnapshot.runStatus === 'Disabled');
});
},
@@ -1155,11 +1164,7 @@
*/
canEnable: function (selection) {
if (selection.empty()) {
- return false;
- }
-
- if (nfCanvasUtils.canModify(selection) === false) {
- return false;
+ return true;
}
return nfCanvasUtils.filterEnable(selection).size() === selection.size();
@@ -1175,11 +1180,20 @@
var selected = d3.select(this);
var selectedData = selected.datum();
+ // disable always allowed for PGs since they will invoke the /flow endpoint for disabling all applicable components (based on permissions)
+ if (nfCanvasUtils.isProcessGroup(selected)) {
+ return true;
+ }
+
+ // not a PG, verify permissions to modify
+ if (nfCanvasUtils.canModify(selected) === false) {
+ return false;
+ }
+
// ensure its a processor, input port, or output port and supports modification and is stopped (can disable)
return ((nfCanvasUtils.isProcessor(selected) || nfCanvasUtils.isInputPort(selected) || nfCanvasUtils.isOutputPort(selected)) &&
- nfCanvasUtils.supportsModification(selected) &&
- (selectedData.status.aggregateSnapshot.runStatus === 'Stopped' ||
- selectedData.status.aggregateSnapshot.runStatus === 'Invalid'));
+ nfCanvasUtils.supportsModification(selected) &&
+ (selectedData.status.aggregateSnapshot.runStatus === 'Stopped' || selectedData.status.aggregateSnapshot.runStatus === 'Invalid'));
});
},
@@ -1190,11 +1204,7 @@
*/
canDisable: function (selection) {
if (selection.empty()) {
- return false;
- }
-
- if (nfCanvasUtils.canModify(selection) === false) {
- return false;
+ return true;
}
return nfCanvasUtils.filterDisable(selection).size() === selection.size();