You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/01/08 18:14:00 UTC
[12/50] nifi git commit: NIFI-4436: - Initial checkpoint: able ot
start version control and detect changes, in standalone mode,
still 'crude' implementation - Checkpoint: Can place flow under version
control and can determine if modified - Checkpoint: Ch
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java
index 1dc74ab..23f723e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java
@@ -36,6 +36,7 @@ import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.SystemBundle;
import org.apache.nifi.provenance.MockProvenanceRepository;
import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.variable.FileBasedVariableRegistry;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.util.NiFiProperties;
@@ -64,6 +65,7 @@ public class TestStandardReportingContext {
private Bundle systemBundle;
private BulletinRepository bulletinRepo;
private VariableRegistry variableRegistry;
+ private FlowRegistryClient flowRegistry;
private volatile String propsFile = TestStandardReportingContext.class.getResource("/flowcontrollertest.nifi.properties").getFile();
@Before
@@ -120,9 +122,10 @@ public class TestStandardReportingContext {
authorizer = new MockPolicyBasedAuthorizer(groups1, users1, policies1);
variableRegistry = new FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths());
+ flowRegistry = Mockito.mock(FlowRegistryClient.class);
bulletinRepo = Mockito.mock(BulletinRepository.class);
- controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer, auditService, encryptor, bulletinRepo, variableRegistry);
+ controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer, auditService, encryptor, bulletinRepo, variableRegistry, flowRegistry);
}
@After
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
index b55e98d..1b54c64 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
@@ -46,6 +46,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.provenance.MockProvenanceRepository;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.variable.FileBasedVariableRegistry;
import org.apache.nifi.util.NiFiProperties;
import org.junit.After;
@@ -714,7 +715,8 @@ public class TestProcessorLifecycle {
final FlowController flowController = FlowController.createStandaloneInstance(mock(FlowFileEventRepository.class), nifiProperties,
mock(Authorizer.class), mock(AuditService.class), null, new VolatileBulletinRepository(),
- new FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths()));
+ new FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths()),
+ mock(FlowRegistryClient.class));
return new FlowControllerAndSystemBundle(flowController, systemBundle);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java
index 7a49103..8044ede 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java
@@ -29,6 +29,7 @@ import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.SystemBundle;
import org.apache.nifi.provenance.MockProvenanceRepository;
import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.variable.FileBasedVariableRegistry;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.util.NiFiProperties;
@@ -80,7 +81,8 @@ public class StandardFlowSerializerTest {
final VariableRegistry variableRegistry = new FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths());
final BulletinRepository bulletinRepo = Mockito.mock(BulletinRepository.class);
- controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer, auditService, encryptor, bulletinRepo, variableRegistry);
+ controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer,
+ auditService, encryptor, bulletinRepo, variableRegistry, Mockito.mock(FlowRegistryClient.class));
serializer = new StandardFlowSerializer(encryptor);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
index a28eb34..27e1678 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -45,6 +46,9 @@ import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.ProcessGroupCounts;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.VersionControlInformation;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.variable.MutableVariableRegistry;
import org.apache.nifi.remote.RemoteGroupPort;
@@ -53,6 +57,7 @@ public class MockProcessGroup implements ProcessGroup {
private final Map<String, ProcessorNode> processorMap = new HashMap<>();
private final FlowController flowController;
private final MutableVariableRegistry variableRegistry = new MutableVariableRegistry(VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY);
+ private VersionControlInformation versionControlInfo;
public MockProcessGroup(final FlowController flowController) {
this.flowController = flowController;
@@ -625,4 +630,35 @@ public class MockProcessGroup implements ProcessGroup {
public Set<ConfiguredComponent> getComponentsAffectedByVariable(String variableName) {
return Collections.emptySet();
}
+
+ @Override
+ public Optional<String> getVersionedComponentId() {
+ return Optional.empty();
+ }
+
+ @Override
+ public void setVersionedComponentId(String versionedComponentId) {
+ }
+
+ @Override
+ public VersionControlInformation getVersionControlInformation() {
+ return versionControlInfo;
+ }
+
+ @Override
+ public void verifyCanUpdate(VersionedFlowSnapshot updatedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty) {
+ }
+
+ @Override
+ public void synchronizeWithFlowRegistry(FlowRegistryClient flowRegistry) {
+ }
+
+ @Override
+ public void updateFlow(VersionedFlowSnapshot proposedFlow, String componentIdSeed, boolean verifyNotDirty) {
+ }
+
+ @Override
+ public void setVersionControlInformation(VersionControlInformation versionControlInformation, Map<String, String> versionedComponentIds) {
+ this.versionControlInfo = versionControlInformation;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index faa8c0e..84e582c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -16,6 +16,14 @@
*/
package org.apache.nifi.web;
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+
import org.apache.nifi.authorization.AuthorizeAccess;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.user.NiFiUser;
@@ -23,6 +31,11 @@ import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.UnknownResourceException;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.web.api.dto.AccessPolicyDTO;
import org.apache.nifi.web.api.dto.AffectedComponentDTO;
import org.apache.nifi.web.api.dto.BulletinBoardDTO;
@@ -59,6 +72,7 @@ import org.apache.nifi.web.api.dto.TemplateDTO;
import org.apache.nifi.web.api.dto.UserDTO;
import org.apache.nifi.web.api.dto.UserGroupDTO;
import org.apache.nifi.web.api.dto.VariableRegistryDTO;
+import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
import org.apache.nifi.web.api.dto.action.HistoryDTO;
import org.apache.nifi.web.api.dto.action.HistoryQueryDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
@@ -101,13 +115,9 @@ import org.apache.nifi.web.api.entity.TemplateEntity;
import org.apache.nifi.web.api.entity.UserEntity;
import org.apache.nifi.web.api.entity.UserGroupEntity;
import org.apache.nifi.web.api.entity.VariableRegistryEntity;
-
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Function;
+import org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity;
+import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
+import org.apache.nifi.web.api.entity.VersionedFlowEntity;
/**
* Defines the NiFiServiceFacade interface.
@@ -491,6 +501,14 @@ public interface NiFiServiceFacade {
ProcessorEntity getProcessor(String id);
/**
+ * Gets the Processor transfer object for the specified id, as it is visible to the given user
+ *
+ * @param id Id of the processor to return
+ * @return The Processor transfer object
+ */
+ ProcessorEntity getProcessor(String id, NiFiUser user);
+
+ /**
* Gets the processor status.
*
* @param id id
@@ -1066,6 +1084,16 @@ public interface NiFiServiceFacade {
RemoteProcessGroupEntity getRemoteProcessGroup(String remoteProcessGroupId);
/**
+ * Gets a remote process group as it is visible to the given user
+ *
+ * @param remoteProcessGroupId The id of the remote process group
+ * @param user the user requesting the action
+ * @return group
+ */
+ RemoteProcessGroupEntity getRemoteProcessGroup(String remoteProcessGroupId, NiFiUser user);
+
+
+ /**
* Gets all remote process groups in the a given parent group.
*
* @param groupId The id of the parent group
@@ -1074,6 +1102,15 @@ public interface NiFiServiceFacade {
Set<RemoteProcessGroupEntity> getRemoteProcessGroups(String groupId);
/**
+ * Gets all remote process groups in the a given parent group as they are visible to the given user
+ *
+ * @param groupId The id of the parent group
+ * @param user the user making the request
+ * @return group
+ */
+ Set<RemoteProcessGroupEntity> getRemoteProcessGroups(String groupId, NiFiUser user);
+
+ /**
* Gets the remote process group status.
*
* @param id remote process group
@@ -1220,6 +1257,132 @@ public interface NiFiServiceFacade {
*/
FunnelEntity deleteFunnel(Revision revision, String funnelId);
+
+ // ----------------------------------------
+ // Version Control methods
+ // ----------------------------------------
+
+ /**
+ * Returns the Version Control information for the Process Group with the given ID
+ *
+ * @param processGroupId the ID of the Process Group
+ * @return the Version Control information that corresponds to the given Process Group, or <code>null</code> if the
+ * process group is not under version control
+ */
+ VersionControlInformationEntity getVersionControlInformation(String processGroupId);
+
+
+ /**
+ * Adds the given Versioned Flow to the registry specified by the given ID
+ *
+ * @param registryId the ID of the registry
+ * @param flow the flow to add to the registry
+ * @return a VersionedFlow that is fully populated, including identifiers
+ *
+ * @throws IOException if unable to communicate with the Flow Registry
+ */
+ VersionedFlow registerVersionedFlow(String registryId, VersionedFlow flow) throws IOException, UnknownResourceException;
+
+ /**
+ * Creates a snapshot of the Process Group with the given identifier, then creates a new Flow entity in the NiFi Registry
+ * and adds the snapshot of the Process Group as the first version of that flow.
+ *
+ * @param groupId the UUID of the Process Group
+ * @param requestEntity the details of the flow to create
+ * @return a VersionControlComponentMappingEntity that contains the information needed to notify a Process Group where it is tracking to and map
+ * component ID's to their Versioned Component ID's
+ */
+ VersionControlComponentMappingEntity registerFlowWithFlowRegistry(String groupId, VersionedFlowEntity requestEntity);
+
+ /**
+ * Adds the given snapshot to the already existing Versioned Flow, which resides in the given Flow Registry with the given id
+ *
+ * @param registryId the ID of the Flow Registry to persist the snapshot to
+ * @param flow the flow where the snapshot should be persisted
+ * @param snapshot the Snapshot to persist
+ * @param comments about the snapshot
+ * @return the snapshot that represents what was stored in the registry
+ *
+ * @throws IOException if unable to communicate with the Flow Registry
+ */
+ VersionedFlowSnapshot registerVersionedFlowSnapshot(String registryId, VersionedFlow flow, VersionedProcessGroup snapshot, String comments) throws IOException, UnknownResourceException;
+
+ /**
+ * Updates the Version Control Information on the Process Group with the given ID
+ *
+ * @param processGroupRevision the Revision of the Process Group
+ * @param processGroupId the ID of the process group to update
+ * @param versionControlInfo the new Version Control Information
+ * @param versionedComponentMapping a mapping of component ID to Versioned Component ID
+ *
+ * @return a VersionControlInformationEntity that represents the newly updated Version Control information
+ */
+ VersionControlInformationEntity setVersionControlInformation(Revision processGroupRevision, String processGroupId, VersionControlInformationDTO versionControlInfo,
+ Map<String, String> versionedComponentMapping);
+
+
+ /**
+ * Retrieves the Versioned Flow Snapshot for the coordinates provided by the given Version Control Information DTO
+ *
+ * @param versionControlInfo the coordinates of the versioned flow
+ * @return the VersionedFlowSnapshot that corresponds to the given coordinates
+ *
+ * @throws ResourceNotFoundException if the Versioned Flow Snapshot could not be found
+ */
+ VersionedFlowSnapshot getVersionedFlowSnapshot(VersionControlInformationDTO versionControlInfo) throws IOException;
+
+ /**
+ * Determines which components currently exist in the Process Group with the given identifier and calculates which of those components
+ * would be impacted by updating the Process Group to the provided snapshot
+ *
+ * @param processGroupId the ID of the Process Group to update
+ * @param updatedSnapshot the snapshot to update the Process Group to
+ * @param user the user making the request
+ * @return the set of all components that would be affected by updating the Process Group
+ */
+ Set<AffectedComponentEntity> getComponentsAffectedByVersionChange(String processGroupId, VersionedFlowSnapshot updatedSnapshot, NiFiUser user) throws IOException;
+
+ /**
+ * Verifies that the Process Group with the given identifier can be updated to the proposed flow
+ *
+ * @param groupId the ID of the Process Group to update
+ * @param proposedFlow the proposed flow
+ * @param verifyConnectionRemoval whether or not to verify that connections that no longer exist in the proposed flow are eligible for deletion
+ * @param verifyNotDirty whether or not to verify that the Process Group is not 'dirty'. If this value is <code>true</code>,
+ * and the Process Group has been modified since it was last synchronized with the Flow Registry, then this method will
+ * throw an IllegalStateException
+ */
+ void verifyCanUpdate(String groupId, VersionedFlowSnapshot proposedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty);
+
+ /**
+ * Updates the Process group with the given ID to match the new snapshot
+ *
+ * @param revision the revision of the Process Group
+ * @param groupId the ID of the Process Group
+ * @param versionControlInfo the Version Control information
+ * @param snapshot the new snapshot
+ * @param componentIdSeed the seed to use for generating new component ID's
+ * @return the Process Group
+ */
+ ProcessGroupEntity updateProcessGroup(Revision revision, String groupId, VersionControlInformationDTO versionControlInfo, VersionedFlowSnapshot snapshot, String componentIdSeed,
+ boolean verifyNotModified);
+
+ /**
+ * Updates the Process group with the given ID to match the new snapshot
+ *
+ * @param user the user performing the request
+ * @param revision the revision of the Process Group
+ * @param groupId the ID of the Process Group
+ * @param versionControlInfo the Version Control information
+ * @param snapshot the new snapshot
+ * @param componentIdSeed the seed to use for generating new component ID's
+ * @return the Process Group
+ */
+ ProcessGroupEntity updateProcessGroup(NiFiUser user, Revision revision, String groupId, VersionControlInformationDTO versionControlInfo, VersionedFlowSnapshot snapshot, String componentIdSeed,
+ boolean verifyNotModified);
+
+ void setFlowRegistryClient(FlowRegistryClient flowRegistryClient);
+
// ----------------------------------------
// Component state methods
// ----------------------------------------
@@ -1290,6 +1453,7 @@ public interface NiFiServiceFacade {
*/
void clearReportingTaskState(String reportingTaskId);
+
// ----------------------------------------
// Label methods
// ----------------------------------------
@@ -1510,6 +1674,15 @@ public interface NiFiServiceFacade {
ControllerServiceEntity getControllerService(String controllerServiceId);
/**
+ * Gets the specified controller service as it is visible to the given user
+ *
+ * @param controllerServiceId id
+ * @param user the user making the request
+ * @return service
+ */
+ ControllerServiceEntity getControllerService(String controllerServiceId, NiFiUser user);
+
+ /**
* Get the descriptor for the specified property of the specified controller service.
*
* @param id id
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacadeLock.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacadeLock.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacadeLock.java
index d0230db..f7f3b90 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacadeLock.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacadeLock.java
@@ -138,6 +138,12 @@ public class NiFiServiceFacadeLock {
return proceedWithReadLock(proceedingJoinPoint);
}
+ @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
+ + "execution(* register*(..))")
+ public Object registerLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
+ return proceedWithReadLock(proceedingJoinPoint);
+ }
+
private Object proceedWithReadLock(final ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
final long beforeLock = System.nanoTime();
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 9caabd6..d3a5fd0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -16,7 +16,32 @@
*/
package org.apache.nifi.web;
-import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.FlowChangeAction;
@@ -85,6 +110,31 @@ import org.apache.nifi.history.History;
import org.apache.nifi.history.HistoryQuery;
import org.apache.nifi.history.PreviousValue;
import org.apache.nifi.registry.ComponentVariableRegistry;
+import org.apache.nifi.registry.flow.ConnectableComponent;
+import org.apache.nifi.registry.flow.FlowRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.RemoteFlowCoordinates;
+import org.apache.nifi.registry.flow.UnknownResourceException;
+import org.apache.nifi.registry.flow.VersionControlInformation;
+import org.apache.nifi.registry.flow.VersionedComponent;
+import org.apache.nifi.registry.flow.VersionedConnection;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
+import org.apache.nifi.registry.flow.diff.DifferenceType;
+import org.apache.nifi.registry.flow.diff.FlowComparator;
+import org.apache.nifi.registry.flow.diff.FlowComparison;
+import org.apache.nifi.registry.flow.diff.FlowDifference;
+import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow;
+import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
+import org.apache.nifi.registry.flow.mapping.InstantiatedConnectableComponent;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedRemoteGroupPort;
+import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinQuery;
@@ -140,6 +190,8 @@ import org.apache.nifi.web.api.dto.TemplateDTO;
import org.apache.nifi.web.api.dto.UserDTO;
import org.apache.nifi.web.api.dto.UserGroupDTO;
import org.apache.nifi.web.api.dto.VariableRegistryDTO;
+import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
+import org.apache.nifi.web.api.dto.VersionedFlowDTO;
import org.apache.nifi.web.api.dto.action.HistoryDTO;
import org.apache.nifi.web.api.dto.action.HistoryQueryDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO;
@@ -197,6 +249,9 @@ import org.apache.nifi.web.api.entity.UserEntity;
import org.apache.nifi.web.api.entity.UserGroupEntity;
import org.apache.nifi.web.api.entity.VariableEntity;
import org.apache.nifi.web.api.entity.VariableRegistryEntity;
+import org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity;
+import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
+import org.apache.nifi.web.api.entity.VersionedFlowEntity;
import org.apache.nifi.web.controller.ControllerFacade;
import org.apache.nifi.web.dao.AccessPolicyDAO;
import org.apache.nifi.web.dao.ConnectionDAO;
@@ -224,29 +279,7 @@ import org.apache.nifi.web.util.SnippetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
+import com.google.common.collect.Sets;
/**
* Implementation of NiFiServiceFacade that performs revision checking.
@@ -285,6 +318,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// administrative services
private AuditService auditService;
+ // flow registry
+ private FlowRegistryClient flowRegistryClient;
+
// properties
private NiFiProperties properties;
private DtoFactory dtoFactory;
@@ -925,7 +961,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public RevisionUpdate<ScheduleComponentsEntity> update() {
// schedule the components
- processGroupDAO.scheduleComponents(processGroupId, state, componentRevisions.keySet());
+ processGroupDAO.scheduleComponents(processGroupId, state, componentRevisions.keySet());
// update the revisions
final Map<String, Revision> updatedRevisions = new HashMap<>();
@@ -950,7 +986,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public ActivateControllerServicesEntity activateControllerServices(final String processGroupId, final ControllerServiceState state, final Map<String, Revision> serviceRevisions) {
-
final NiFiUser user = NiFiUserUtils.getNiFiUser();
return activateControllerServices(user, processGroupId, state, serviceRevisions);
}
@@ -1010,6 +1045,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return entityFactory.createControllerConfigurationEntity(updatedComponent.getComponent(), updateRevision, permissions);
}
+
@Override
public NodeDTO updateNode(final NodeDTO nodeDTO) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
@@ -2512,9 +2548,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return entityFactory.createStatusHistoryEntity(dto, permissions);
}
- private ProcessorEntity createProcessorEntity(final ProcessorNode processor) {
+ private ProcessorEntity createProcessorEntity(final ProcessorNode processor, final NiFiUser user) {
final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processor.getIdentifier()));
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor);
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor, user);
final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processor.getIdentifier()));
final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processor.getIdentifier()));
final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
@@ -2524,8 +2560,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public Set<ProcessorEntity> getProcessors(final String groupId, final boolean includeDescendants) {
final Set<ProcessorNode> processors = processorDAO.getProcessors(groupId, includeDescendants);
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
return processors.stream()
- .map(processor -> createProcessorEntity(processor))
+ .map(processor -> createProcessorEntity(processor, user))
.collect(Collectors.toSet());
}
@@ -2582,8 +2619,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public ProcessorEntity getProcessor(final String id) {
+ return getProcessor(id, NiFiUserUtils.getNiFiUser());
+ }
+
+ @Override
+ public ProcessorEntity getProcessor(final String id, final NiFiUser user) {
final ProcessorNode processor = processorDAO.getProcessor(id);
- return createProcessorEntity(processor);
+ return createProcessorEntity(processor, user);
}
@Override
@@ -3103,9 +3145,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
.collect(Collectors.toSet());
}
- private RemoteProcessGroupEntity createRemoteGroupEntity(final RemoteProcessGroup rpg) {
+ private RemoteProcessGroupEntity createRemoteGroupEntity(final RemoteProcessGroup rpg, final NiFiUser user) {
final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(rpg.getIdentifier()));
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(rpg);
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(rpg, user);
final RemoteProcessGroupStatusDTO status = dtoFactory.createRemoteProcessGroupStatusDto(controllerFacade.getRemoteProcessGroupStatus(rpg.getIdentifier()));
final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(rpg.getIdentifier()));
final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
@@ -3114,9 +3156,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public Set<RemoteProcessGroupEntity> getRemoteProcessGroups(final String groupId) {
+ return getRemoteProcessGroups(groupId, NiFiUserUtils.getNiFiUser());
+ }
+
+ @Override
+ public Set<RemoteProcessGroupEntity> getRemoteProcessGroups(final String groupId, final NiFiUser user) {
final Set<RemoteProcessGroup> rpgs = remoteProcessGroupDAO.getRemoteProcessGroups(groupId);
return rpgs.stream()
- .map(rpg -> createRemoteGroupEntity(rpg))
+ .map(rpg -> createRemoteGroupEntity(rpg, user))
.collect(Collectors.toSet());
}
@@ -3150,8 +3197,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public RemoteProcessGroupEntity getRemoteProcessGroup(final String remoteProcessGroupId) {
+ return getRemoteProcessGroup(remoteProcessGroupId, NiFiUserUtils.getNiFiUser());
+ }
+
+ @Override
+ public RemoteProcessGroupEntity getRemoteProcessGroup(final String remoteProcessGroupId, final NiFiUser user) {
final RemoteProcessGroup rpg = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId);
- return createRemoteGroupEntity(rpg);
+ return createRemoteGroupEntity(rpg, user);
}
@Override
@@ -3307,8 +3359,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public ControllerServiceEntity getControllerService(final String controllerServiceId) {
+ return getControllerService(controllerServiceId, NiFiUserUtils.getNiFiUser());
+ }
+
+ @Override
+ public ControllerServiceEntity getControllerService(final String controllerServiceId, final NiFiUser user) {
final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId);
- return createControllerServiceEntity(controllerService, Sets.newHashSet(controllerServiceId), NiFiUserUtils.getNiFiUser());
+ return createControllerServiceEntity(controllerService, Sets.newHashSet(controllerServiceId), user);
}
@Override
@@ -3375,6 +3432,415 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return entityFactory.createStatusHistoryEntity(dto, permissions);
}
+ @Override
+ public VersionControlComponentMappingEntity registerFlowWithFlowRegistry(final String groupId, final VersionedFlowEntity requestEntity) {
+ // Create a VersionedProcessGroup snapshot of the flow as it is currently.
+ final InstantiatedVersionedProcessGroup versionedProcessGroup = createFlowSnapshot(groupId);
+
+ final VersionedFlowDTO versionedFlowDto = requestEntity.getVersionedFlow();
+ final String flowId = versionedFlowDto.getFlowId() == null ? UUID.randomUUID().toString() : versionedFlowDto.getFlowId();
+
+ final VersionedFlow versionedFlow = new VersionedFlow();
+ versionedFlow.setBucketIdentifier(versionedFlowDto.getBucketId());
+ versionedFlow.setCreatedTimestamp(System.currentTimeMillis());
+ versionedFlow.setDescription(versionedFlowDto.getDescription());
+ versionedFlow.setModifiedTimestamp(versionedFlow.getCreatedTimestamp());
+ versionedFlow.setName(versionedFlowDto.getFlowName());
+ versionedFlow.setIdentifier(flowId);
+
+ // Add the Versioned Flow and first snapshot to the Flow Registry
+ final String registryId = requestEntity.getVersionedFlow().getRegistryId();
+ final VersionedFlowSnapshot registeredSnapshot;
+ final VersionedFlow registeredFlow;
+
+ String action = "create the flow";
+ try {
+ // first, create the flow in the registry, if necessary
+ if (versionedFlowDto.getFlowId() == null) {
+ registeredFlow = registerVersionedFlow(registryId, versionedFlow);
+ } else {
+ registeredFlow = getVersionedFlow(registryId, versionedFlowDto.getBucketId(), versionedFlowDto.getFlowId());
+ }
+
+ action = "add the local flow to the Flow Registry as the first Snapshot";
+
+ // add first snapshot to the flow in the registry
+ final String comments = versionedFlow.getDescription() == null ? "Initial version of flow" : versionedFlow.getDescription();
+ registeredSnapshot = registerVersionedFlowSnapshot(registryId, registeredFlow, versionedProcessGroup, comments);
+ } catch (final UnknownResourceException e) {
+ throw new IllegalArgumentException(e);
+ } catch (final IOException ioe) {
+ // will result in a 500: Internal Server Error
+ throw new RuntimeException("Failed to communicate with Flow Registry when attempting to " + action);
+ }
+
+ // Update the Process Group with the new VersionControlInformation. (Send this to all nodes).
+ final VersionControlInformationDTO vci = new VersionControlInformationDTO();
+ vci.setBucketId(registeredFlow.getBucketIdentifier());
+ vci.setCurrent(true);
+ vci.setFlowId(registeredFlow.getIdentifier());
+ vci.setGroupId(groupId);
+ vci.setModified(false);
+ vci.setRegistryId(registryId);
+ vci.setVersion(registeredSnapshot.getSnapshotMetadata().getVersion());
+
+ final Map<String, String> mapping = dtoFactory.createVersionControlComponentMappingDto(versionedProcessGroup);
+
+ final Revision groupRevision = revisionManager.getRevision(groupId);
+ final RevisionDTO groupRevisionDto = dtoFactory.createRevisionDTO(groupRevision);
+
+ final VersionControlComponentMappingEntity entity = new VersionControlComponentMappingEntity();
+ entity.setVersionControlInformation(vci);
+ entity.setProcessGroupRevision(groupRevisionDto);
+ entity.setVersionControlComponentMapping(mapping);
+ return entity;
+ }
+
+ @Override
+ public VersionControlInformationEntity getVersionControlInformation(final String groupId) {
+ final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
+ final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation();
+ if (versionControlInfo == null) {
+ return null;
+ }
+
+ final VersionControlInformationDTO versionControlDto = dtoFactory.createVersionControlInformationDto(versionControlInfo);
+ final RevisionDTO groupRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(groupId));
+ return entityFactory.createVersionControlInformationEntity(versionControlDto, groupRevision);
+ }
+
+ private InstantiatedVersionedProcessGroup createFlowSnapshot(final String processGroupId) {
+ final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId);
+ final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
+ final InstantiatedVersionedProcessGroup versionedGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient);
+ return versionedGroup;
+ }
+
+ @Override
+ public VersionedFlow registerVersionedFlow(final String registryId, final VersionedFlow flow) throws IOException, UnknownResourceException {
+ final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
+ if (registry == null) {
+ throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
+ }
+
+ return registry.registerVersionedFlow(flow);
+ }
+
+ private VersionedFlow getVersionedFlow(final String registryId, final String bucketId, final String flowId) throws IOException, UnknownResourceException {
+ final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
+ if (registry == null) {
+ throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
+ }
+
+ return registry.getVersionedFlow(bucketId, flowId);
+ }
+
+ @Override
+ public VersionedFlowSnapshot registerVersionedFlowSnapshot(final String registryId, final VersionedFlow flow,
+ final VersionedProcessGroup snapshot, final String comments) throws IOException, UnknownResourceException {
+ final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
+ if (registry == null) {
+ throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
+ }
+
+ return registry.registerVersionedFlowSnapshot(flow, snapshot, comments);
+ }
+
+ @Override
+ public VersionControlInformationEntity setVersionControlInformation(final Revision revision, final String processGroupId,
+ final VersionControlInformationDTO versionControlInfo, final Map<String, String> versionedComponentMapping) {
+
+ final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId);
+
+ final RevisionUpdate<VersionControlInformationDTO> snapshot = updateComponent(revision,
+ group,
+ () -> processGroupDAO.updateVersionControlInformation(versionControlInfo, versionedComponentMapping),
+ processGroup -> dtoFactory.createVersionControlInformationDto(processGroup.getVersionControlInformation()));
+
+ return entityFactory.createVersionControlInformationEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()));
+ }
+
+ @Override
+ public void verifyCanUpdate(final String groupId, final VersionedFlowSnapshot proposedFlow, final boolean verifyConnectionRemoval, final boolean verifyNotDirty) {
+ final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
+ group.verifyCanUpdate(proposedFlow, verifyConnectionRemoval, verifyNotDirty);
+ }
+
+ @Override
+ public Set<AffectedComponentEntity> getComponentsAffectedByVersionChange(final String processGroupId, final VersionedFlowSnapshot updatedSnapshot, final NiFiUser user) throws IOException {
+ final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId);
+
+ final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
+ final VersionedProcessGroup localContents = mapper.mapProcessGroup(group, flowRegistryClient);
+
+ final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localContents);
+ final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("Proposed Flow", updatedSnapshot.getFlowContents());
+
+ final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow);
+ final FlowComparison comparison = flowComparator.compare();
+
+ final Set<AffectedComponentEntity> affectedComponents = comparison.getDifferences().stream()
+ .filter(difference -> difference.getDifferenceType() != DifferenceType.COMPONENT_ADDED) // components that are added are not components that will be affected in the local flow.
+ .map(difference -> {
+ final VersionedComponent localComponent = difference.getComponentA();
+
+ final String state;
+ switch (localComponent.getComponentType()) {
+ case CONTROLLER_SERVICE:
+ final String serviceId = ((InstantiatedVersionedControllerService) localComponent).getInstanceId();
+ state = controllerServiceDAO.getControllerService(serviceId).getState().name();
+ break;
+ case PROCESSOR:
+ final String processorId = ((InstantiatedVersionedProcessor) localComponent).getInstanceId();
+ state = processorDAO.getProcessor(processorId).getPhysicalScheduledState().name();
+ break;
+ case REMOTE_INPUT_PORT:
+ final InstantiatedVersionedRemoteGroupPort inputPort = (InstantiatedVersionedRemoteGroupPort) localComponent;
+ state = remoteProcessGroupDAO.getRemoteProcessGroup(inputPort.getInstanceGroupId()).getInputPort(inputPort.getInstanceId()).getScheduledState().name();
+ break;
+ case REMOTE_OUTPUT_PORT:
+ final InstantiatedVersionedRemoteGroupPort outputPort = (InstantiatedVersionedRemoteGroupPort) localComponent;
+ state = remoteProcessGroupDAO.getRemoteProcessGroup(outputPort.getInstanceGroupId()).getOutputPort(outputPort.getInstanceId()).getScheduledState().name();
+ break;
+ default:
+ state = null;
+ break;
+ }
+
+ return createAffectedComponentEntity((InstantiatedVersionedComponent) localComponent, localComponent.getComponentType().name(), state, user);
+ })
+ .collect(Collectors.toCollection(HashSet::new));
+
+ for (final FlowDifference difference : comparison.getDifferences()) {
+ VersionedComponent component = difference.getComponentA();
+ if (component == null) {
+ component = difference.getComponentB();
+ }
+
+ if (component.getComponentType() == org.apache.nifi.registry.flow.ComponentType.CONNECTION) {
+ final VersionedConnection connection = (VersionedConnection) component;
+
+ final ConnectableComponent source = connection.getSource();
+ final ConnectableComponent destination = connection.getDestination();
+
+ affectedComponents.add(createAffectedComponentEntity((InstantiatedConnectableComponent) source, user));
+ affectedComponents.add(createAffectedComponentEntity((InstantiatedConnectableComponent) destination, user));
+ }
+ }
+
+ return affectedComponents;
+ }
+
+ private String getComponentState(final InstantiatedConnectableComponent localComponent) {
+ final String componentId = localComponent.getInstanceId();
+ final String groupId = localComponent.getInstanceGroupId();
+
+ switch (localComponent.getType()) {
+ case PROCESSOR:
+ return processorDAO.getProcessor(componentId).getPhysicalScheduledState().name();
+ case REMOTE_INPUT_PORT:
+ return remoteProcessGroupDAO.getRemoteProcessGroup(groupId).getInputPort(componentId).getScheduledState().name();
+ case REMOTE_OUTPUT_PORT:
+ return remoteProcessGroupDAO.getRemoteProcessGroup(groupId).getOutputPort(componentId).getScheduledState().name();
+ default:
+ return null;
+ }
+ }
+
+ private AffectedComponentEntity createAffectedComponentEntity(final InstantiatedVersionedComponent instance, final String componentTypeName, final String componentState, final NiFiUser user) {
+ final AffectedComponentEntity entity = new AffectedComponentEntity();
+ entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(instance.getInstanceId())));
+ entity.setId(instance.getInstanceId());
+
+ final Authorizable authorizable = getAuthorizable(componentTypeName, instance);
+ final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable, user);
+ entity.setPermissions(permissionsDto);
+
+ final AffectedComponentDTO dto = new AffectedComponentDTO();
+ dto.setId(instance.getInstanceId());
+ dto.setReferenceType(componentTypeName);
+ dto.setProcessGroupId(instance.getInstanceGroupId());
+ dto.setState(componentState);
+
+ entity.setComponent(dto);
+ return entity;
+ }
+
+ private AffectedComponentEntity createAffectedComponentEntity(final InstantiatedConnectableComponent instance, final NiFiUser user) {
+ final AffectedComponentEntity entity = new AffectedComponentEntity();
+ entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(instance.getInstanceId())));
+ entity.setId(instance.getInstanceId());
+
+ final String componentTypeName = instance.getType().name();
+ final Authorizable authorizable = getAuthorizable(componentTypeName, instance);
+ final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable, user);
+ entity.setPermissions(permissionsDto);
+
+ final AffectedComponentDTO dto = new AffectedComponentDTO();
+ dto.setId(instance.getInstanceId());
+ dto.setReferenceType(componentTypeName);
+ dto.setProcessGroupId(instance.getInstanceGroupId());
+ dto.setState(getComponentState(instance));
+
+ entity.setComponent(dto);
+ return entity;
+ }
+
+ private Authorizable getAuthorizable(final String componentTypeName, final InstantiatedVersionedComponent versionedComponent) {
+ final String componentId = versionedComponent.getInstanceId();
+
+ if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.CONTROLLER_SERVICE.name())) {
+ return authorizableLookup.getControllerService(componentId).getAuthorizable();
+ }
+
+ if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.CONNECTION.name())) {
+ return authorizableLookup.getConnection(componentId).getAuthorizable();
+ }
+
+ if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.FUNNEL.name())) {
+ return authorizableLookup.getFunnel(componentId);
+ }
+
+ if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.INPUT_PORT.name())) {
+ return authorizableLookup.getInputPort(componentId);
+ }
+
+ if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.OUTPUT_PORT.name())) {
+ return authorizableLookup.getOutputPort(componentId);
+ }
+
+ if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.LABEL.name())) {
+ return authorizableLookup.getLabel(componentId);
+ }
+
+ if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.PROCESS_GROUP.name())) {
+ return authorizableLookup.getProcessGroup(componentId).getAuthorizable();
+ }
+
+ if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.PROCESSOR.name())) {
+ return authorizableLookup.getProcessor(componentId).getAuthorizable();
+ }
+
+ if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.REMOTE_INPUT_PORT.name())) {
+ return authorizableLookup.getRemoteProcessGroup(versionedComponent.getInstanceGroupId());
+ }
+
+ if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.REMOTE_OUTPUT_PORT.name())) {
+ return authorizableLookup.getRemoteProcessGroup(versionedComponent.getInstanceGroupId());
+ }
+
+ if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.REMOTE_PROCESS_GROUP.name())) {
+ return authorizableLookup.getRemoteProcessGroup(versionedComponent.getInstanceGroupId());
+ }
+
+ return null;
+ }
+
+ @Override
+ public VersionedFlowSnapshot getVersionedFlowSnapshot(final VersionControlInformationDTO versionControlInfo) throws IOException {
+ final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(versionControlInfo.getRegistryId());
+ if (flowRegistry == null) {
+ throw new ResourceNotFoundException("Could not find any Flow Registry registered with identifier " + versionControlInfo.getRegistryId());
+ }
+
+ final VersionedFlowSnapshot snapshot;
+ try {
+ snapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketId(), versionControlInfo.getFlowId(), versionControlInfo.getVersion());
+ } catch (final UnknownResourceException e) {
+ throw new IllegalArgumentException("The Flow Registry with ID " + versionControlInfo.getRegistryId() + " reports that no Flow exists with Bucket "
+ + versionControlInfo.getBucketId() + ", Flow " + versionControlInfo.getFlowId() + ", Version " + versionControlInfo.getVersion());
+ }
+
+ // If this Flow has a reference to a remote flow, we need to pull that remote flow as well
+ populateVersionedChildFlows(snapshot);
+
+ return snapshot;
+ }
+
+ private void populateVersionedChildFlows(final VersionedFlowSnapshot snapshot) throws IOException {
+ final VersionedProcessGroup group = snapshot.getFlowContents();
+
+ for (final VersionedProcessGroup child : group.getProcessGroups()) {
+ populateVersionedFlows(child);
+ }
+ }
+
+ private void populateVersionedFlows(final VersionedProcessGroup group) throws IOException {
+ final RemoteFlowCoordinates remoteCoordinates = group.getRemoteFlowCoordinates();
+
+ if (remoteCoordinates != null) {
+ final String registryUrl = remoteCoordinates.getRegistryUrl();
+ final String registryId = flowRegistryClient.getFlowRegistryId(registryUrl);
+ if (registryId == null) {
+ throw new IllegalArgumentException("Process Group with ID " + group.getIdentifier() + " is under Version Control, referencing a Flow Registry at URL [" + registryUrl
+ + "], but no Flow Registry is currently registered for that URL.");
+ }
+
+ final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId);
+
+ final VersionedFlowSnapshot childSnapshot;
+ try {
+ childSnapshot = flowRegistry.getFlowContents(remoteCoordinates.getBucketId(), remoteCoordinates.getFlowId(), remoteCoordinates.getVersion());
+ } catch (final UnknownResourceException e) {
+ throw new IllegalArgumentException("The Flow Registry with ID " + registryId + " reports that no Flow exists with Bucket "
+ + remoteCoordinates.getBucketId() + ", Flow " + remoteCoordinates.getFlowId() + ", Version " + remoteCoordinates.getVersion());
+ }
+
+ final VersionedProcessGroup fetchedGroup = childSnapshot.getFlowContents();
+ group.setComments(fetchedGroup.getComments());
+ group.setPosition(fetchedGroup.getPosition());
+ group.setName(fetchedGroup.getName());
+ group.setVariables(fetchedGroup.getVariables());
+
+ group.setConnections(new LinkedHashSet<>(fetchedGroup.getConnections()));
+ group.setControllerServices(new LinkedHashSet<>(fetchedGroup.getControllerServices()));
+ group.setFunnels(new LinkedHashSet<>(fetchedGroup.getFunnels()));
+ group.setInputPorts(new LinkedHashSet<>(fetchedGroup.getInputPorts()));
+ group.setLabels(new LinkedHashSet<>(fetchedGroup.getLabels()));
+ group.setOutputPorts(new LinkedHashSet<>(fetchedGroup.getOutputPorts()));
+ group.setProcessGroups(new LinkedHashSet<>(fetchedGroup.getProcessGroups()));
+ group.setProcessors(new LinkedHashSet<>(fetchedGroup.getProcessors()));
+ group.setRemoteProcessGroups(new LinkedHashSet<>(fetchedGroup.getRemoteProcessGroups()));
+ }
+
+ for (final VersionedProcessGroup child : group.getProcessGroups()) {
+ populateVersionedFlows(child);
+ }
+ }
+
+
+ @Override
+ public ProcessGroupEntity updateProcessGroup(final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo,
+ final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified) {
+
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ return updateProcessGroup(user, revision, groupId, versionControlInfo, proposedFlowSnapshot, componentIdSeed, verifyNotModified);
+ }
+
+ @Override
+ public ProcessGroupEntity updateProcessGroup(final NiFiUser user, final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo,
+ final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified) {
+
+ final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(groupId);
+ final RevisionUpdate<ProcessGroupDTO> snapshot = updateComponent(user, revision,
+ processGroupNode,
+ () -> processGroupDAO.updateProcessGroupFlow(groupId, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified),
+ processGroup -> dtoFactory.createProcessGroupDto(processGroup));
+
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode);
+ final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification());
+ final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroupNode.getIdentifier()));
+ final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroupNode.getIdentifier()));
+ final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
+ return entityFactory.createProcessGroupEntity(snapshot.getComponent(), updatedRevision, permissions, status, bulletinEntities);
+ }
+
+
+ @Override
+ public void setFlowRegistryClient(final FlowRegistryClient client) {
+ this.flowRegistryClient = client;
+ }
+
private AuthorizationResult authorizeAction(final Action action) {
final String sourceId = action.getSourceId();
final Component type = action.getSourceType();
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
index 7118e01..531823a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
@@ -16,40 +16,8 @@
*/
package org.apache.nifi.web.api;
-import static javax.ws.rs.core.Response.Status.NOT_FOUND;
-import static org.apache.commons.lang3.StringUtils.isEmpty;
-import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME;
-import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE;
-
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.core.CacheControl;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedHashMap;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.ResponseBuilder;
-import javax.ws.rs.core.UriBuilder;
-import javax.ws.rs.core.UriBuilderException;
-import javax.ws.rs.core.UriInfo;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AuthorizableLookup;
import org.apache.nifi.authorization.AuthorizeAccess;
@@ -69,6 +37,7 @@ import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.framework.security.util.SslContextFactory;
import org.apache.nifi.remote.HttpRemoteSiteListener;
import org.apache.nifi.remote.VersionNegotiator;
import org.apache.nifi.remote.exception.BadRequestException;
@@ -77,6 +46,7 @@ import org.apache.nifi.remote.exception.NotAuthorizedException;
import org.apache.nifi.remote.protocol.ResponseCode;
import org.apache.nifi.remote.protocol.http.HttpHeaders;
import org.apache.nifi.util.ComponentIdGenerator;
+import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.Revision;
@@ -87,9 +57,45 @@ import org.apache.nifi.web.api.entity.TransactionResultEntity;
import org.apache.nifi.web.security.ProxiedEntitiesUtils;
import org.apache.nifi.web.security.util.CacheKey;
import org.apache.nifi.web.util.WebUtils;
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.ClientProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.core.CacheControl;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedHashMap;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.ResponseBuilder;
+import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.core.UriBuilderException;
+import javax.ws.rs.core.UriInfo;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static javax.ws.rs.core.Response.Status.NOT_FOUND;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE;
+
/**
* Base class for controllers.
*/
@@ -221,6 +227,16 @@ public abstract class ApplicationResource {
return Optional.of(idGenerationSeed);
}
+ protected Client createJerseyClient() {
+ final NiFiProperties properties = getProperties();
+ final ClientConfig clientConfig = new ClientConfig();
+ final int connectionTimeout = (int) FormatUtils.getTimeDuration(properties.getClusterNodeConnectionTimeout(), TimeUnit.MILLISECONDS);
+ final int readTimeout = (int) FormatUtils.getTimeDuration(properties.getClusterNodeReadTimeout(), TimeUnit.MILLISECONDS);
+ clientConfig.property(ClientProperties.READ_TIMEOUT, readTimeout);
+ clientConfig.property(ClientProperties.CONNECT_TIMEOUT, connectionTimeout);
+ clientConfig.property(ClientProperties.FOLLOW_REDIRECTS, Boolean.TRUE);
+ return WebUtils.createClient(clientConfig, SslContextFactory.createSslContext(properties));
+ }
/**
* Generates an Ok response with no content.
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
index c9aea4f..38e8891 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
@@ -595,6 +595,24 @@ public class FlowResource extends ApplicationResource {
componentIds.add(outputPort.getIdentifier());
});
+ // ensure authorized for each remote input port we will attempt to schedule
+ group.findAllRemoteProcessGroups().stream()
+ .flatMap(rpg -> rpg.getInputPorts().stream())
+ .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS)
+ .filter(port -> port.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()))
+ .forEach(port -> {
+ componentIds.add(port.getIdentifier());
+ });
+
+ // ensure authorized for each remote output port we will attempt to schedule
+ group.findAllRemoteProcessGroups().stream()
+ .flatMap(rpg -> rpg.getOutputPorts().stream())
+ .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS)
+ .filter(port -> port.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()))
+ .forEach(port -> {
+ componentIds.add(port.getIdentifier());
+ });
+
return componentIds;
});
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index b866677..ebed0ad 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -763,7 +763,6 @@ public class ProcessGroupResource extends ApplicationResource {
private void updateVariableRegistryReplicated(final String groupId, final URI originalUri, final Collection<AffectedComponentDTO> affectedProcessors,
final Collection<AffectedComponentDTO> affectedServices, final VariableRegistryUpdateRequest updateRequest,
final VariableRegistryEntity requestEntity) throws InterruptedException, IOException {
-
final Pause pause = createPause(updateRequest);
// stop processors
@@ -805,8 +804,6 @@ public class ProcessGroupResource extends ApplicationResource {
logger.info("In order to update Variable Registry for Process Group with ID {}, no Processors are affected.", groupId);
updateRequest.getStartProcessorsStep().setComplete(true);
}
-
- updateRequest.setComplete(true);
}
/**
@@ -1414,6 +1411,7 @@ public class ProcessGroupResource extends ApplicationResource {
* @param <T> type of class
* @return the response entity
*/
+ @SuppressWarnings("unchecked")
private <T> T getResponseEntity(final NodeResponse nodeResponse, final Class<T> clazz) {
T entity = (T) nodeResponse.getUpdatedEntity();
if (entity == null) {