You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2021/05/04 16:40:14 UTC

[nifi] branch main updated: NIFI-7788 Created a new endpoint in RemoteProcessGroupResource to allow updating run statuses/transmission state of all remote process groups within a process group.

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 5bcfcf4  NIFI-7788 Created a new endpoint in RemoteProcessGroupResource to allow updating run statuses/transmission state of all remote process groups within a process group.
5bcfcf4 is described below

commit 5bcfcf42bb121e5a8324bf7443359c0461965233
Author: Tamas Palfy <ta...@gmail.com>
AuthorDate: Tue Sep 8 18:26:00 2020 +0200

    NIFI-7788 Created a new endpoint in RemoteProcessGroupResource to allow updating run statuses/transmission state of all remote process groups within a process group.
    
    When selecting run/stop on a process group/canvas/selection, it will try to enable/disable transmission of all involved remote process groups.
    
    NIFI-7788 Supplied same functionality missed when selecting a process group.
    NIFI-7788 Updated endpoint URL paths.
    NIFI-7788 No need to return list of remote process groups when updating en masse.
    NIFI-7788 Added some null checks in RemoteProcessGroupsEndpointMerger.merge.
    NIFI-7788 Fix checkstyle violation.
    
    This closes #4516.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../RemoteProcessGroupsEndpointMerger.java         |  44 ++++++---
 .../org/apache/nifi/web/NiFiServiceFacade.java     |   8 ++
 .../apache/nifi/web/StandardNiFiServiceFacade.java |  14 +++
 .../nifi/web/api/RemoteProcessGroupResource.java   | 102 +++++++++++++++++++++
 .../nifi/web/StandardNiFiServiceFacadeTest.java    |  72 ++++++++++++++-
 .../src/main/webapp/js/nf/canvas/nf-actions.js     |  64 ++++++++++---
 6 files changed, 274 insertions(+), 30 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupsEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupsEndpointMerger.java
index 79cfa1d..ce5be06 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupsEndpointMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupsEndpointMerger.java
@@ -32,10 +32,17 @@ import java.util.regex.Pattern;
 
 public class RemoteProcessGroupsEndpointMerger implements EndpointResponseMerger {
     public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups");
+    public static final Pattern REMOTE_PROCESS_GROUPS_RUN_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/remote-process-groups/process-group/[a-f0-9\\-]{36}/run-status");
 
     @Override
     public boolean canHandle(final URI uri, final String method) {
-        return "GET".equalsIgnoreCase(method) && REMOTE_PROCESS_GROUPS_URI_PATTERN.matcher(uri.getPath()).matches();
+        if ("GET".equalsIgnoreCase(method) && REMOTE_PROCESS_GROUPS_URI_PATTERN.matcher(uri.getPath()).matches()) {
+            return true;
+        } else if ("PUT".equalsIgnoreCase(method) && REMOTE_PROCESS_GROUPS_RUN_STATUS_URI_PATTERN.matcher(uri.getPath()).matches()) {
+            return true;
+        }
+
+        return false;
     }
 
     @Override
@@ -47,24 +54,31 @@ public class RemoteProcessGroupsEndpointMerger implements EndpointResponseMerger
         final RemoteProcessGroupsEntity responseEntity = clientResponse.getClientResponse().readEntity(RemoteProcessGroupsEntity.class);
         final Set<RemoteProcessGroupEntity> rpgEntities = responseEntity.getRemoteProcessGroups();
 
-        final Map<String, Map<NodeIdentifier, RemoteProcessGroupEntity>> entityMap = new HashMap<>();
-        for (final NodeResponse nodeResponse : successfulResponses) {
-            final RemoteProcessGroupsEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().readEntity(RemoteProcessGroupsEntity.class);
-            final Set<RemoteProcessGroupEntity> nodeRpgEntities = nodeResponseEntity.getRemoteProcessGroups();
+        if (rpgEntities != null) {
+            final Map<String, Map<NodeIdentifier, RemoteProcessGroupEntity>> entityMap = new HashMap<>();
+            for (final NodeResponse nodeResponse : successfulResponses) {
+                final RemoteProcessGroupsEntity nodeResponseEntity =
+                    nodeResponse == clientResponse
+                        ? responseEntity
+                        : nodeResponse.getClientResponse().readEntity(RemoteProcessGroupsEntity.class);
+                final Set<RemoteProcessGroupEntity> nodeRpgEntities = nodeResponseEntity.getRemoteProcessGroups();
 
-            for (final RemoteProcessGroupEntity nodeRpgEntity : nodeRpgEntities) {
-                final NodeIdentifier nodeId = nodeResponse.getNodeId();
-                Map<NodeIdentifier, RemoteProcessGroupEntity> innerMap = entityMap.get(nodeId);
-                if (innerMap == null) {
-                    innerMap = new HashMap<>();
-                    entityMap.put(nodeRpgEntity.getId(), innerMap);
-                }
+                if (nodeRpgEntities != null) {
+                    for (final RemoteProcessGroupEntity nodeRpgEntity : nodeRpgEntities) {
+                        final NodeIdentifier nodeId = nodeResponse.getNodeId();
+                        Map<NodeIdentifier, RemoteProcessGroupEntity> innerMap = entityMap.get(nodeId);
+                        if (innerMap == null) {
+                            innerMap = new HashMap<>();
+                            entityMap.put(nodeRpgEntity.getId(), innerMap);
+                        }
 
-                innerMap.put(nodeResponse.getNodeId(), nodeRpgEntity);
+                        innerMap.put(nodeResponse.getNodeId(), nodeRpgEntity);
+                    }
+                }
             }
-        }
 
-        RemoteProcessGroupsEntityMerger.mergeRemoteProcessGroups(rpgEntities, entityMap);
+            RemoteProcessGroupsEntityMerger.mergeRemoteProcessGroups(rpgEntities, entityMap);
+        }
 
         // create a new client response
         return new NodeResponse(clientResponse, responseEntity);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 16df93e..d455836 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -1310,6 +1310,14 @@ public interface NiFiServiceFacade {
      */
     StatusHistoryEntity getRemoteProcessGroupStatusHistory(String id);
 
+
+    /**
+     * Verifies that transmission state of all remote process groups within the specified process group can be updated.
+     * @param processGroupId The process group in which to verify remote process groups
+     * @param shouldTransmit The transmission state to verify for
+     */
+    void verifyUpdateRemoteProcessGroups(String processGroupId, boolean shouldTransmit);
+
     /**
      * Verifies the specified remote process group can be updated.
      *
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 804e2d9..5e90135 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -571,6 +571,20 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
+    public void verifyUpdateRemoteProcessGroups(String processGroupId, boolean shouldTransmit) {
+        List<RemoteProcessGroup> allRemoteProcessGroups = processGroupDAO.getProcessGroup(processGroupId).findAllRemoteProcessGroups();
+
+        allRemoteProcessGroups.stream()
+            .map(remoteProcessGroup -> {
+                final RemoteProcessGroupDTO dto = new RemoteProcessGroupDTO();
+                dto.setId(remoteProcessGroup.getIdentifier());
+                dto.setTransmitting(shouldTransmit);
+                return dto;
+            })
+            .forEach(this::verifyUpdateRemoteProcessGroup);
+    }
+
+    @Override
     public void verifyUpdateRemoteProcessGroup(final RemoteProcessGroupDTO remoteProcessGroupDTO) {
         // if remote group does not exist, then the update request is likely creating it
         // so we don't verify since it will fail
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
index 575e01b..a5ac39c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
@@ -24,10 +24,12 @@ import io.swagger.annotations.ApiResponses;
 import io.swagger.annotations.Authorization;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.authorization.ProcessGroupAuthorizable;
 import org.apache.nifi.authorization.RequestAction;
 import org.apache.nifi.authorization.resource.Authorizable;
 import org.apache.nifi.authorization.resource.OperationAuthorizable;
 import org.apache.nifi.authorization.user.NiFiUserUtils;
+import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.web.NiFiServiceFacade;
 import org.apache.nifi.web.Revision;
 import org.apache.nifi.web.api.dto.ComponentStateDTO;
@@ -39,6 +41,7 @@ import org.apache.nifi.web.api.entity.ComponentStateEntity;
 import org.apache.nifi.web.api.entity.RemotePortRunStatusEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity;
+import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity;
 import org.apache.nifi.web.api.request.ClientIdParameter;
 import org.apache.nifi.web.api.request.LongParameter;
 
@@ -58,6 +61,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.net.URI;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * RESTful endpoint for managing a Remote group.
@@ -830,6 +834,104 @@ public class RemoteProcessGroupResource extends ApplicationResource {
     }
 
     /**
+     * Updates the operational status for all remote process groups in the specified process group with the specified value.
+     *
+     * @param httpServletRequest                request
+     * @param processGroupId                    The id of the process group in which all remote process groups to update.
+     * @param requestRemotePortRunStatusEntity  A remotePortRunStatusEntity that holds the desired run status
+     * @return A response with an array of RemoteProcessGroupEntity objects.
+     */
+    @PUT
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("process-group/{id}/run-status")
+    @ApiOperation(
+            value = "Updates run status of all remote process groups in a process group (recursively)",
+            response = RemoteProcessGroupEntity.class,
+            authorizations = {
+                    @Authorization(value = "Write - /remote-process-groups/{uuid} or /operation/remote-process-groups/{uuid}")
+            }
+    )
+    @ApiResponses(
+            value = {
+                    @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+                    @ApiResponse(code = 401, message = "Client could not be authenticated."),
+                    @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+                    @ApiResponse(code = 404, message = "The specified resource could not be found."),
+                    @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+            }
+    )
+    public Response updateRemoteProcessGroupRunStatuses(
+            @Context HttpServletRequest httpServletRequest,
+            @ApiParam(
+                    value = "The process group id.",
+                    required = true
+            )
+            @PathParam("id") String processGroupId,
+            @ApiParam(
+                    value = "The remote process groups run status.",
+                    required = true
+            ) final RemotePortRunStatusEntity requestRemotePortRunStatusEntity
+    ) {
+        if (requestRemotePortRunStatusEntity == null) {
+            throw new IllegalArgumentException("Remote process group run status must be specified.");
+        }
+
+        requestRemotePortRunStatusEntity.validateState();
+
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.PUT, requestRemotePortRunStatusEntity);
+        } else if (isDisconnectedFromCluster()) {
+            verifyDisconnectedNodeModification(requestRemotePortRunStatusEntity.isDisconnectedNodeAcknowledged());
+        }
+
+        // handle expects request (usually from the cluster manager)
+        final Set<Revision> revisions = serviceFacade.getRevisionsFromGroup(
+            processGroupId,
+            group -> group.findAllRemoteProcessGroups().stream()
+                .filter(remoteProcessGroup ->
+                    requestRemotePortRunStatusEntity.getState().equals("TRANSMITTING") && !remoteProcessGroup.isTransmitting()
+                    || requestRemotePortRunStatusEntity.getState().equals("STOPPED") && remoteProcessGroup.isTransmitting()
+                )
+                .filter(remoteProcessGroup -> OperationAuthorizable.isOperationAuthorized(remoteProcessGroup, authorizer, NiFiUserUtils.getNiFiUser()))
+                .map(RemoteProcessGroup::getIdentifier)
+                .collect(Collectors.toSet())
+        );
+        return withWriteLock(
+            serviceFacade,
+            requestRemotePortRunStatusEntity,
+            revisions,
+            lookup -> {
+                final ProcessGroupAuthorizable processGroup = lookup.getProcessGroup(processGroupId);
+
+                authorizeProcessGroup(processGroup, authorizer, lookup, RequestAction.READ, false, false, false, false, false);
+
+                Set<Authorizable> remoteProcessGroups = processGroup.getEncapsulatedRemoteProcessGroups();
+                for (Authorizable remoteProcessGroup : remoteProcessGroups) {
+                    OperationAuthorizable.authorizeOperation(remoteProcessGroup, authorizer, NiFiUserUtils.getNiFiUser());
+                }
+            },
+            () -> serviceFacade.verifyUpdateRemoteProcessGroups(processGroupId, shouldTransmit(requestRemotePortRunStatusEntity)),
+            (_revisions, remotePortRunStatusEntity) -> {
+                Set<RemoteProcessGroupEntity> remoteProcessGroupEntities = _revisions.stream()
+                    .map(revision -> {
+                        final RemoteProcessGroupEntity entity = serviceFacade.updateRemoteProcessGroup(revision, createDTOWithDesiredRunStatus(revision.getComponentId(), remotePortRunStatusEntity));
+                        populateRemainingRemoteProcessGroupEntityContent(entity);
+
+                        return entity;
+                    })
+                    .collect(Collectors.toSet());
+
+                RemoteProcessGroupsEntity remoteProcessGroupsEntity = new RemoteProcessGroupsEntity();
+
+                Response response = generateOkResponse(remoteProcessGroupsEntity).build();
+
+                return response;
+            }
+        );
+    }
+
+    /**
      * Gets the state for a RemoteProcessGroup.
      *
      * @param id The id of the RemoteProcessGroup
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
index 54c0b97..87d6ab5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
@@ -38,6 +38,7 @@ import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.history.History;
 import org.apache.nifi.history.HistoryQuery;
 import org.apache.nifi.nar.ExtensionManager;
@@ -50,6 +51,7 @@ import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
 import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
 import org.apache.nifi.web.api.dto.DtoFactory;
 import org.apache.nifi.web.api.dto.EntityFactory;
+import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
 import org.apache.nifi.web.api.dto.action.HistoryDTO;
 import org.apache.nifi.web.api.dto.action.HistoryQueryDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
@@ -57,18 +59,23 @@ import org.apache.nifi.web.api.entity.ActionEntity;
 import org.apache.nifi.web.api.entity.StatusHistoryEntity;
 import org.apache.nifi.web.controller.ControllerFacade;
 import org.apache.nifi.web.dao.ProcessGroupDAO;
+import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
 import org.apache.nifi.web.security.token.NiFiAuthenticationToken;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Answers;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import org.springframework.security.core.Authentication;
 import org.springframework.security.core.context.SecurityContextHolder;
 
 import java.util.Arrays;
+import java.util.List;
 import java.util.Date;
 import java.util.Map;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -186,7 +193,7 @@ public class StandardNiFiServiceFacadeTest {
         final ControllerFacade controllerFacade = new ControllerFacade();
         controllerFacade.setFlowController(flowController);
 
-        processGroupDAO = mock(ProcessGroupDAO.class);
+        processGroupDAO = mock(ProcessGroupDAO.class, Answers.RETURNS_DEEP_STUBS);
 
         serviceFacade = new StandardNiFiServiceFacade();
         serviceFacade.setAuditService(auditService);
@@ -418,4 +425,67 @@ public class StandardNiFiServiceFacadeTest {
         assertTrue(serviceFacade.isAnyProcessGroupUnderVersionControl(groupId));
     }
 
+    @Test
+    public void testVerifyUpdateRemoteProcessGroups() throws Exception {
+        // GIVEN
+        RemoteProcessGroupDAO remoteProcessGroupDAO = mock(RemoteProcessGroupDAO.class);
+        serviceFacade.setRemoteProcessGroupDAO(remoteProcessGroupDAO);
+
+        String groupId = "groupId";
+        boolean shouldTransmit = true;
+
+        String remoteProcessGroupId1 = "remoteProcessGroupId1";
+        String remoteProcessGroupId2 = "remoteProcessGroupId2";
+
+        List<RemoteProcessGroup> remoteProcessGroups = Arrays.asList(
+            // Current 'transmitting' status should not influence the verification, which should be solely based on the 'shouldTransmitting' value
+            mockRemoteProcessGroup(remoteProcessGroupId1, true),
+            mockRemoteProcessGroup(remoteProcessGroupId2, false)
+        );
+
+        List<RemoteProcessGroupDTO> expected = Arrays.asList(
+            createRemoteProcessGroupDTO(remoteProcessGroupId1, shouldTransmit),
+            createRemoteProcessGroupDTO(remoteProcessGroupId2, shouldTransmit)
+        );
+
+        when(processGroupDAO.getProcessGroup(groupId).findAllRemoteProcessGroups()).thenReturn(remoteProcessGroups);
+        expected.stream()
+            .map(RemoteProcessGroupDTO::getId)
+            .forEach(remoteProcessGroupId -> when(remoteProcessGroupDAO.hasRemoteProcessGroup(remoteProcessGroupId)).thenReturn(true));
+
+
+        // WHEN
+        serviceFacade.verifyUpdateRemoteProcessGroups(groupId, shouldTransmit);
+
+        // THEN
+        ArgumentCaptor<RemoteProcessGroupDTO> remoteProcessGroupDTOArgumentCaptor = ArgumentCaptor.forClass(RemoteProcessGroupDTO.class);
+
+        verify(remoteProcessGroupDAO, times(remoteProcessGroups.size())).verifyUpdate(remoteProcessGroupDTOArgumentCaptor.capture());
+
+        List<RemoteProcessGroupDTO> actual = remoteProcessGroupDTOArgumentCaptor.getAllValues();
+
+        assertEquals(toMap(expected), toMap(actual));
+    }
+
+    private Map<String, Boolean> toMap(List<RemoteProcessGroupDTO> list) {
+        return list.stream().collect(Collectors.toMap(RemoteProcessGroupDTO::getId, RemoteProcessGroupDTO::isTransmitting));
+    }
+
+    private RemoteProcessGroup mockRemoteProcessGroup(String identifier, boolean transmitting) {
+        RemoteProcessGroup remoteProcessGroup = mock(RemoteProcessGroup.class);
+
+        when(remoteProcessGroup.getIdentifier()).thenReturn(identifier);
+        when(remoteProcessGroup.isTransmitting()).thenReturn(transmitting);
+
+        return remoteProcessGroup;
+    }
+
+    private RemoteProcessGroupDTO createRemoteProcessGroupDTO(String id, boolean transmitting) {
+        RemoteProcessGroupDTO remoteProcessGroup = new RemoteProcessGroupDTO();
+
+        remoteProcessGroup.setId(id);
+        remoteProcessGroup.setTransmitting(transmitting);
+
+        return remoteProcessGroup;
+    }
 }
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
index d6341c0..5ae7945 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js
@@ -652,11 +652,18 @@
                     'id': nfCanvasUtils.getGroupId(),
                     'state': 'RUNNING'
                 };
-
                 updateResource(config.urls.api + '/flow/process-groups/' + encodeURIComponent(nfCanvasUtils.getGroupId()), entity).done(updateProcessGroup);
+
+                var remoteProcessGroupEntity = {
+                    'state': 'TRANSMITTING'
+                };
+                updateResource(config.urls.api + '/remote-process-groups/process-group/' + encodeURIComponent(nfCanvasUtils.getGroupId()) + '/run-status', remoteProcessGroupEntity)
+                    .done(function (response) {
+                        nfRemoteProcessGroup.set(response.remoteProcessGroups);
+                    });
             } else {
                 var componentsToStart = selection.filter(function (d) {
-                    return nfCanvasUtils.isRunnable(d3.select(this));
+                    return nfCanvasUtils.isRunnable(d3.select(this)) || nfCanvasUtils.canStartTransmitting(d3.select(this));
                 });
 
                 // ensure there are startable components selected
@@ -675,6 +682,12 @@
                                 'id': d.id,
                                 'state': 'RUNNING'
                             }
+                        } else if (nfCanvasUtils.isRemoteProcessGroup(selected)) {
+                            uri = d.uri + '/run-status';
+                            entity = {
+                                'revision': nfClient.getRevision(d),
+                                'state': 'TRANSMITTING'
+                            };
                         } else {
                             uri = d.uri + '/run-status';
                             entity = {
@@ -683,13 +696,21 @@
                             };
                         }
 
-                        startRequests.push(updateResource(uri, entity).done(function (response) {
-                            if (nfCanvasUtils.isProcessGroup(selected)) {
+                        if (nfCanvasUtils.isProcessGroup(selected)) {
+                            var remoteProcessGroupEntity = {
+                                'state': 'TRANSMITTING'
+                            };
+                            var startRemoteProcessGroups = updateResource(config.urls.api + '/remote-process-groups/process-group/' + encodeURIComponent(nfCanvasUtils.getGroupId()) + '/run-status', remoteProcessGroupEntity);
+                            startRequests.push(startRemoteProcessGroups.done(function (response) {}));
+
+                            startRequests.push(updateResource(uri, entity).done(function (response) {
                                 nfCanvasUtils.getComponentByType('ProcessGroup').reload(d.id);
-                            } else {
+                            }));
+                        } else {
+                            startRequests.push(updateResource(uri, entity).done(function (response) {
                                 nfCanvasUtils.getComponentByType(d.type).set(response);
-                            }
-                        }));
+                            }));
+                        }
                     });
 
                     // inform Angular app once the updates have completed
@@ -755,11 +776,18 @@
                     'id': nfCanvasUtils.getGroupId(),
                     'state': 'STOPPED'
                 };
-
                 updateResource(config.urls.api + '/flow/process-groups/' + encodeURIComponent(nfCanvasUtils.getGroupId()), entity).done(updateProcessGroup);
+
+                var remoteProcessGroupEntity = {
+                    'state': 'STOPPED'
+                };
+                updateResource(config.urls.api + '/remote-process-groups/process-group/' + encodeURIComponent(nfCanvasUtils.getGroupId()) + '/run-status', remoteProcessGroupEntity)
+                    .done(function (response) {
+                        nfRemoteProcessGroup.set(response.remoteProcessGroups);
+                    });
             } else {
                 var componentsToStop = selection.filter(function (d) {
-                    return nfCanvasUtils.isStoppable(d3.select(this));
+                    return nfCanvasUtils.isStoppable(d3.select(this)) || nfCanvasUtils.canStopTransmitting(d3.select(this));
                 });
 
                 // ensure there are some component to stop
@@ -786,13 +814,21 @@
                             };
                         }
 
-                        stopRequests.push(updateResource(uri, entity).done(function (response) {
-                            if (nfCanvasUtils.isProcessGroup(selected)) {
+                        if (nfCanvasUtils.isProcessGroup(selected)) {
+                            var remoteProcessGroupEntity = {
+                                'state': 'STOPPED'
+                            };
+                            var stopRemoteProcessGroups = updateResource(config.urls.api + '/remote-process-groups/process-group/' + encodeURIComponent(nfCanvasUtils.getGroupId()) + '/run-status', remoteProcessGroupEntity);
+                            stopRequests.push(stopRemoteProcessGroups.done(function (response) {}));
+
+                            stopRequests.push(updateResource(uri, entity).done(function (response) {
                                 nfCanvasUtils.getComponentByType('ProcessGroup').reload(d.id);
-                            } else {
+                            }));
+                        } else {
+                            stopRequests.push(updateResource(uri, entity).done(function (response) {
                                 nfCanvasUtils.getComponentByType(d.type).set(response);
-                            }
-                        }));
+                            }));
+                        }
                     });
 
                     // inform Angular app once the updates have completed