You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2021/05/04 16:40:14 UTC
[nifi] branch main updated: NIFI-7788 Created a new endpoint in
RemoteProcessGroupResource to allow updating run statuses/transmission
state of all remote process groups within a process group.
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 5bcfcf4 NIFI-7788 Created a new endpoint in RemoteProcessGroupResource to allow updating run statuses/transmission state of all remote process groups within a process group.
5bcfcf4 is described below
commit 5bcfcf42bb121e5a8324bf7443359c0461965233
Author: Tamas Palfy <ta...@gmail.com>
AuthorDate: Tue Sep 8 18:26:00 2020 +0200
NIFI-7788 Created a new endpoint in RemoteProcessGroupResource to allow updating run statuses/transmission state of all remote process groups within a process group.
When selecting run/stop on a process group/canvas/selection, it will try to enable/disable transmission of all involved remote process groups.
NIFI-7788 Supplied same functionality missed when selecting a process group.
NIFI-7788 Updated endpoint URL paths.
NIFI-7788 No need to return list of remote process groups when updating en masse.
NIFI-7788 Added some null checks in RemoteProcessGroupsEndpointMerger.merge.
NIFI-7788 Fix checkstyle violation.
This closes #4516.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../RemoteProcessGroupsEndpointMerger.java | 44 ++++++---
.../org/apache/nifi/web/NiFiServiceFacade.java | 8 ++
.../apache/nifi/web/StandardNiFiServiceFacade.java | 14 +++
.../nifi/web/api/RemoteProcessGroupResource.java | 102 +++++++++++++++++++++
.../nifi/web/StandardNiFiServiceFacadeTest.java | 72 ++++++++++++++-
.../src/main/webapp/js/nf/canvas/nf-actions.js | 64 ++++++++++---
6 files changed, 274 insertions(+), 30 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupsEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupsEndpointMerger.java
index 79cfa1d..ce5be06 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupsEndpointMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupsEndpointMerger.java
@@ -32,10 +32,17 @@ import java.util.regex.Pattern;
public class RemoteProcessGroupsEndpointMerger implements EndpointResponseMerger {
public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups");
+ public static final Pattern REMOTE_PROCESS_GROUPS_RUN_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/remote-process-groups/process-group/[a-f0-9\\-]{36}/run-status");
@Override
public boolean canHandle(final URI uri, final String method) {
- return "GET".equalsIgnoreCase(method) && REMOTE_PROCESS_GROUPS_URI_PATTERN.matcher(uri.getPath()).matches();
+ if ("GET".equalsIgnoreCase(method) && REMOTE_PROCESS_GROUPS_URI_PATTERN.matcher(uri.getPath()).matches()) {
+ return true;
+ } else if ("PUT".equalsIgnoreCase(method) && REMOTE_PROCESS_GROUPS_RUN_STATUS_URI_PATTERN.matcher(uri.getPath()).matches()) {
+ return true;
+ }
+
+ return false;
}
@Override
@@ -47,24 +54,31 @@ public class RemoteProcessGroupsEndpointMerger implements EndpointResponseMerger
final RemoteProcessGroupsEntity responseEntity = clientResponse.getClientResponse().readEntity(RemoteProcessGroupsEntity.class);
final Set<RemoteProcessGroupEntity> rpgEntities = responseEntity.getRemoteProcessGroups();
- final Map<String, Map<NodeIdentifier, RemoteProcessGroupEntity>> entityMap = new HashMap<>();
- for (final NodeResponse nodeResponse : successfulResponses) {
- final RemoteProcessGroupsEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().readEntity(RemoteProcessGroupsEntity.class);
- final Set<RemoteProcessGroupEntity> nodeRpgEntities = nodeResponseEntity.getRemoteProcessGroups();
+ if (rpgEntities != null) {
+ final Map<String, Map<NodeIdentifier, RemoteProcessGroupEntity>> entityMap = new HashMap<>();
+ for (final NodeResponse nodeResponse : successfulResponses) {
+ final RemoteProcessGroupsEntity nodeResponseEntity =
+ nodeResponse == clientResponse
+ ? responseEntity
+ : nodeResponse.getClientResponse().readEntity(RemoteProcessGroupsEntity.class);
+ final Set<RemoteProcessGroupEntity> nodeRpgEntities = nodeResponseEntity.getRemoteProcessGroups();
- for (final RemoteProcessGroupEntity nodeRpgEntity : nodeRpgEntities) {
- final NodeIdentifier nodeId = nodeResponse.getNodeId();
- Map<NodeIdentifier, RemoteProcessGroupEntity> innerMap = entityMap.get(nodeId);
- if (innerMap == null) {
- innerMap = new HashMap<>();
- entityMap.put(nodeRpgEntity.getId(), innerMap);
- }
+ if (nodeRpgEntities != null) {
+ for (final RemoteProcessGroupEntity nodeRpgEntity : nodeRpgEntities) {
+ final NodeIdentifier nodeId = nodeResponse.getNodeId();
+ Map<NodeIdentifier, RemoteProcessGroupEntity> innerMap = entityMap.get(nodeId);
+ if (innerMap == null) {
+ innerMap = new HashMap<>();
+ entityMap.put(nodeRpgEntity.getId(), innerMap);
+ }
- innerMap.put(nodeResponse.getNodeId(), nodeRpgEntity);
+ innerMap.put(nodeResponse.getNodeId(), nodeRpgEntity);
+ }
+ }
}
- }
- RemoteProcessGroupsEntityMerger.mergeRemoteProcessGroups(rpgEntities, entityMap);
+ RemoteProcessGroupsEntityMerger.mergeRemoteProcessGroups(rpgEntities, entityMap);
+ }
// create a new client response
return new NodeResponse(clientResponse, responseEntity);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 16df93e..d455836 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -1310,6 +1310,14 @@ public interface NiFiServiceFacade {
*/
StatusHistoryEntity getRemoteProcessGroupStatusHistory(String id);
+
+ /**
+ * Verifies that transmission state of all remote process groups within the specified process group can be updated.
+ * @param processGroupId The process group in which to verify remote process groups
+ * @param shouldTransmit The transmission state to verify for
+ */
+ void verifyUpdateRemoteProcessGroups(String processGroupId, boolean shouldTransmit);
+
/**
* Verifies the specified remote process group can be updated.
*
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 804e2d9..5e90135 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -571,6 +571,20 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
+ public void verifyUpdateRemoteProcessGroups(String processGroupId, boolean shouldTransmit) {
+ List<RemoteProcessGroup> allRemoteProcessGroups = processGroupDAO.getProcessGroup(processGroupId).findAllRemoteProcessGroups();
+
+ allRemoteProcessGroups.stream()
+ .map(remoteProcessGroup -> {
+ final RemoteProcessGroupDTO dto = new RemoteProcessGroupDTO();
+ dto.setId(remoteProcessGroup.getIdentifier());
+ dto.setTransmitting(shouldTransmit);
+ return dto;
+ })
+ .forEach(this::verifyUpdateRemoteProcessGroup);
+ }
+
+ @Override
public void verifyUpdateRemoteProcessGroup(final RemoteProcessGroupDTO remoteProcessGroupDTO) {
// if remote group does not exist, then the update request is likely creating it
// so we don't verify since it will fail
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
index 575e01b..a5ac39c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
@@ -24,10 +24,12 @@ import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.authorization.ProcessGroupAuthorizable;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.OperationAuthorizable;
import org.apache.nifi.authorization.user.NiFiUserUtils;
+import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.dto.ComponentStateDTO;
@@ -39,6 +41,7 @@ import org.apache.nifi.web.api.entity.ComponentStateEntity;
import org.apache.nifi.web.api.entity.RemotePortRunStatusEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity;
+import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity;
import org.apache.nifi.web.api.request.ClientIdParameter;
import org.apache.nifi.web.api.request.LongParameter;
@@ -58,6 +61,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.net.URI;
import java.util.Set;
+import java.util.stream.Collectors;
/**
* RESTful endpoint for managing a Remote group.
@@ -830,6 +834,104 @@ public class RemoteProcessGroupResource extends ApplicationResource {
}
/**
+ * Updates the operational status for all remote process groups in the specified process group with the specified value.
+ *
+ * @param httpServletRequest request
+ * @param processGroupId The id of the process group in which all remote process groups to update.
+ * @param requestRemotePortRunStatusEntity A remotePortRunStatusEntity that holds the desired run status
+ * @return A response with an array of RemoteProcessGroupEntity objects.
+ */
+ @PUT
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("process-group/{id}/run-status")
+ @ApiOperation(
+ value = "Updates run status of all remote process groups in a process group (recursively)",
+ response = RemoteProcessGroupEntity.class,
+ authorizations = {
+ @Authorization(value = "Write - /remote-process-groups/{uuid} or /operation/remote-process-groups/{uuid}")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ }
+ )
+ public Response updateRemoteProcessGroupRunStatuses(
+ @Context HttpServletRequest httpServletRequest,
+ @ApiParam(
+ value = "The process group id.",
+ required = true
+ )
+ @PathParam("id") String processGroupId,
+ @ApiParam(
+ value = "The remote process groups run status.",
+ required = true
+ ) final RemotePortRunStatusEntity requestRemotePortRunStatusEntity
+ ) {
+ if (requestRemotePortRunStatusEntity == null) {
+ throw new IllegalArgumentException("Remote process group run status must be specified.");
+ }
+
+ requestRemotePortRunStatusEntity.validateState();
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.PUT, requestRemotePortRunStatusEntity);
+ } else if (isDisconnectedFromCluster()) {
+ verifyDisconnectedNodeModification(requestRemotePortRunStatusEntity.isDisconnectedNodeAcknowledged());
+ }
+
+ // handle expects request (usually from the cluster manager)
+ final Set<Revision> revisions = serviceFacade.getRevisionsFromGroup(
+ processGroupId,
+ group -> group.findAllRemoteProcessGroups().stream()
+ .filter(remoteProcessGroup ->
+ requestRemotePortRunStatusEntity.getState().equals("TRANSMITTING") && !remoteProcessGroup.isTransmitting()
+ || requestRemotePortRunStatusEntity.getState().equals("STOPPED") && remoteProcessGroup.isTransmitting()
+ )
+ .filter(remoteProcessGroup -> OperationAuthorizable.isOperationAuthorized(remoteProcessGroup, authorizer, NiFiUserUtils.getNiFiUser()))
+ .map(RemoteProcessGroup::getIdentifier)
+ .collect(Collectors.toSet())
+ );
+ return withWriteLock(
+ serviceFacade,
+ requestRemotePortRunStatusEntity,
+ revisions,
+ lookup -> {
+ final ProcessGroupAuthorizable processGroup = lookup.getProcessGroup(processGroupId);
+
+ authorizeProcessGroup(processGroup, authorizer, lookup, RequestAction.READ, false, false, false, false, false);
+
+ Set<Authorizable> remoteProcessGroups = processGroup.getEncapsulatedRemoteProcessGroups();
+ for (Authorizable remoteProcessGroup : remoteProcessGroups) {
+ OperationAuthorizable.authorizeOperation(remoteProcessGroup, authorizer, NiFiUserUtils.getNiFiUser());
+ }
+ },
+ () -> serviceFacade.verifyUpdateRemoteProcessGroups(processGroupId, shouldTransmit(requestRemotePortRunStatusEntity)),
+ (_revisions, remotePortRunStatusEntity) -> {
+ Set<RemoteProcessGroupEntity> remoteProcessGroupEntities = _revisions.stream()
+ .map(revision -> {
+ final RemoteProcessGroupEntity entity = serviceFacade.updateRemoteProcessGroup(revision, createDTOWithDesiredRunStatus(revision.getComponentId(), remotePortRunStatusEntity));
+ populateRemainingRemoteProcessGroupEntityContent(entity);
+
+ return entity;
+ })
+ .collect(Collectors.toSet());
+
+ RemoteProcessGroupsEntity remoteProcessGroupsEntity = new RemoteProcessGroupsEntity();
+
+ Response response = generateOkResponse(remoteProcessGroupsEntity).build();
+
+ return response;
+ }
+ );
+ }
+
+ /**
* Gets the state for a RemoteProcessGroup.
*
* @param id The id of the RemoteProcessGroup
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
index 54c0b97..87d6ab5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
@@ -38,6 +38,7 @@ import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.history.History;
import org.apache.nifi.history.HistoryQuery;
import org.apache.nifi.nar.ExtensionManager;
@@ -50,6 +51,7 @@ import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.EntityFactory;
+import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.action.HistoryDTO;
import org.apache.nifi.web.api.dto.action.HistoryQueryDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
@@ -57,18 +59,23 @@ import org.apache.nifi.web.api.entity.ActionEntity;
import org.apache.nifi.web.api.entity.StatusHistoryEntity;
import org.apache.nifi.web.controller.ControllerFacade;
import org.apache.nifi.web.dao.ProcessGroupDAO;
+import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
import org.apache.nifi.web.security.token.NiFiAuthenticationToken;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Answers;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import java.util.Arrays;
+import java.util.List;
import java.util.Date;
import java.util.Map;
import java.util.UUID;
+import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -186,7 +193,7 @@ public class StandardNiFiServiceFacadeTest {
final ControllerFacade controllerFacade = new ControllerFacade();
controllerFacade.setFlowController(flowController);
- processGroupDAO = mock(ProcessGroupDAO.class);
+ processGroupDAO = mock(ProcessGroupDAO.class, Answers.RETURNS_DEEP_STUBS);
serviceFacade = new StandardNiFiServiceFacade();
serviceFacade.setAuditService(auditService);
@@ -418,4 +425,67 @@ public class StandardNiFiServiceFacadeTest {
assertTrue(serviceFacade.isAnyProcessGroupUnderVersionControl(groupId));
}
+ @Test
+ public void testVerifyUpdateRemoteProcessGroups() throws Exception {
+ // GIVEN
+ RemoteProcessGroupDAO remoteProcessGroupDAO = mock(RemoteProcessGroupDAO.class);
+ serviceFacade.setRemoteProcessGroupDAO(remoteProcessGroupDAO);
+
+ String groupId = "groupId";
+ boolean shouldTransmit = true;
+
+ String remoteProcessGroupId1 = "remoteProcessGroupId1";
+ String remoteProcessGroupId2 = "remoteProcessGroupId2";
+
+ List<RemoteProcessGroup> remoteProcessGroups = Arrays.asList(
+ // Current 'transmitting' status should not influence the verification, which should be solely based on the 'shouldTransmitting' value
+ mockRemoteProcessGroup(remoteProcessGroupId1, true),
+ mockRemoteProcessGroup(remoteProcessGroupId2, false)
+ );
+
+ List<RemoteProcessGroupDTO> expected = Arrays.asList(
+ createRemoteProcessGroupDTO(remoteProcessGroupId1, shouldTransmit),
+ createRemoteProcessGroupDTO(remoteProcessGroupId2, shouldTransmit)
+ );
+
+ when(processGroupDAO.getProcessGroup(groupId).findAllRemoteProcessGroups()).thenReturn(remoteProcessGroups);
+ expected.stream()
+ .map(RemoteProcessGroupDTO::getId)
+ .forEach(remoteProcessGroupId -> when(remoteProcessGroupDAO.hasRemoteProcessGroup(remoteProcessGroupId)).thenReturn(true));
+
+
+ // WHEN
+ serviceFacade.verifyUpdateRemoteProcessGroups(groupId, shouldTransmit);
+
+ // THEN
+ ArgumentCaptor<RemoteProcessGroupDTO> remoteProcessGroupDTOArgumentCaptor = ArgumentCaptor.forClass(RemoteProcessGroupDTO.class);
+
+ verify(remoteProcessGroupDAO, times(remoteProcessGroups.size())).verifyUpdate(remoteProcessGroupDTOArgumentCaptor.capture());
+
+ List<RemoteProcessGroupDTO> actual = remoteProcessGroupDTOArgumentCaptor.getAllValues();
+
+ assertEquals(toMap(expected), toMap(actual));
+ }
+
+ private Map<String, Boolean> toMap(List<RemoteProcessGroupDTO> list) {
+ return list.stream().collect(Collectors.toMap(RemoteProcessGroupDTO::getId, RemoteProcessGroupDTO::isTransmitting));
+ }
+
+ private RemoteProcessGroup mockRemoteProcessGroup(String identifier, boolean transmitting) {
+ RemoteProcessGroup remoteProcessGroup = mock(RemoteProcessGroup.class);
+
+ when(remoteProcessGroup.getIdentifier()).thenReturn(identifier);
+ when(remoteProcessGroup.isTransmitting()).thenReturn(transmitting);
+
+ return remoteProcessGroup;
+ }
+
+ private RemoteProcessGroupDTO createRemoteProcessGroupDTO(String id, boolean transmitting) {
+ RemoteProcessGroupDTO remoteProcessGroup = new RemoteProcessGroupDTO();
+
+ remoteProcessGroup.setId(id);
+ remoteProcessGroup.setTransmitting(transmitting);
+
+ return remoteProcessGroup;
+ }
}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
index d6341c0..5ae7945 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
@@ -652,11 +652,18 @@
'id': nfCanvasUtils.getGroupId(),
'state': 'RUNNING'
};
-
updateResource(config.urls.api + '/flow/process-groups/' + encodeURIComponent(nfCanvasUtils.getGroupId()), entity).done(updateProcessGroup);
+
+ var remoteProcessGroupEntity = {
+ 'state': 'TRANSMITTING'
+ };
+ updateResource(config.urls.api + '/remote-process-groups/process-group/' + encodeURIComponent(nfCanvasUtils.getGroupId()) + '/run-status', remoteProcessGroupEntity)
+ .done(function (response) {
+ nfRemoteProcessGroup.set(response.remoteProcessGroups);
+ });
} else {
var componentsToStart = selection.filter(function (d) {
- return nfCanvasUtils.isRunnable(d3.select(this));
+ return nfCanvasUtils.isRunnable(d3.select(this)) || nfCanvasUtils.canStartTransmitting(d3.select(this));
});
// ensure there are startable components selected
@@ -675,6 +682,12 @@
'id': d.id,
'state': 'RUNNING'
}
+ } else if (nfCanvasUtils.isRemoteProcessGroup(selected)) {
+ uri = d.uri + '/run-status';
+ entity = {
+ 'revision': nfClient.getRevision(d),
+ 'state': 'TRANSMITTING'
+ };
} else {
uri = d.uri + '/run-status';
entity = {
@@ -683,13 +696,21 @@
};
}
- startRequests.push(updateResource(uri, entity).done(function (response) {
- if (nfCanvasUtils.isProcessGroup(selected)) {
+ if (nfCanvasUtils.isProcessGroup(selected)) {
+ var remoteProcessGroupEntity = {
+ 'state': 'TRANSMITTING'
+ };
+ var startRemoteProcessGroups = updateResource(config.urls.api + '/remote-process-groups/process-group/' + encodeURIComponent(nfCanvasUtils.getGroupId()) + '/run-status', remoteProcessGroupEntity);
+ startRequests.push(startRemoteProcessGroups.done(function (response) {}));
+
+ startRequests.push(updateResource(uri, entity).done(function (response) {
nfCanvasUtils.getComponentByType('ProcessGroup').reload(d.id);
- } else {
+ }));
+ } else {
+ startRequests.push(updateResource(uri, entity).done(function (response) {
nfCanvasUtils.getComponentByType(d.type).set(response);
- }
- }));
+ }));
+ }
});
// inform Angular app once the updates have completed
@@ -755,11 +776,18 @@
'id': nfCanvasUtils.getGroupId(),
'state': 'STOPPED'
};
-
updateResource(config.urls.api + '/flow/process-groups/' + encodeURIComponent(nfCanvasUtils.getGroupId()), entity).done(updateProcessGroup);
+
+ var remoteProcessGroupEntity = {
+ 'state': 'STOPPED'
+ };
+ updateResource(config.urls.api + '/remote-process-groups/process-group/' + encodeURIComponent(nfCanvasUtils.getGroupId()) + '/run-status', remoteProcessGroupEntity)
+ .done(function (response) {
+ nfRemoteProcessGroup.set(response.remoteProcessGroups);
+ });
} else {
var componentsToStop = selection.filter(function (d) {
- return nfCanvasUtils.isStoppable(d3.select(this));
+ return nfCanvasUtils.isStoppable(d3.select(this)) || nfCanvasUtils.canStopTransmitting(d3.select(this));
});
// ensure there are some component to stop
@@ -786,13 +814,21 @@
};
}
- stopRequests.push(updateResource(uri, entity).done(function (response) {
- if (nfCanvasUtils.isProcessGroup(selected)) {
+ if (nfCanvasUtils.isProcessGroup(selected)) {
+ var remoteProcessGroupEntity = {
+ 'state': 'STOPPED'
+ };
+ var stopRemoteProcessGroups = updateResource(config.urls.api + '/remote-process-groups/process-group/' + encodeURIComponent(nfCanvasUtils.getGroupId()) + '/run-status', remoteProcessGroupEntity);
+ stopRequests.push(stopRemoteProcessGroups.done(function (response) {}));
+
+ stopRequests.push(updateResource(uri, entity).done(function (response) {
nfCanvasUtils.getComponentByType('ProcessGroup').reload(d.id);
- } else {
+ }));
+ } else {
+ stopRequests.push(updateResource(uri, entity).done(function (response) {
nfCanvasUtils.getComponentByType(d.type).set(response);
- }
- }));
+ }));
+ }
});
// inform Angular app once the updates have completed