You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/01/08 18:14:21 UTC
[33/50] nifi git commit: NIFI-4436: Removed isCurrent,
isModified from VersionControlInformation and associated DTO. Bug
fixes & code refactoring
NIFI-4436: Removed isCurrent, isModified from VersionControlInformation and associated DTO. Bug fixes & code refactoring
Signed-off-by: Matt Gilman <ma...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fe8b30bf
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fe8b30bf
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fe8b30bf
Branch: refs/heads/master
Commit: fe8b30bf2608e6b321c54ad812e7a85e13acaa5f
Parents: db2cc9f
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Dec 5 16:18:16 2017 -0500
Committer: Bryan Bende <bb...@apache.org>
Committed: Mon Jan 8 12:44:55 2018 -0500
----------------------------------------------------------------------
.../api/dto/VersionControlInformationDTO.java | 22 ---
.../manager/ProcessGroupEntityMerger.java | 2 +-
.../VersionControlInformationEntityMerger.java | 53 +++++-
.../flow/VersionControlInformation.java | 11 --
.../nifi/registry/flow/VersionedFlowState.java | 21 ++-
.../controller/StandardFlowSynchronizer.java | 5 +-
.../nifi/groups/StandardProcessGroup.java | 165 +++++++++----------
.../groups/StandardVersionedFlowStatus.java | 11 +-
.../nifi/groups/VersionControlFields.java | 61 +++++++
.../flow/StandardVersionControlInformation.java | 51 ++----
.../nifi/remote/StandardRemoteProcessGroup.java | 5 +-
.../org/apache/nifi/web/NiFiServiceFacade.java | 14 --
.../nifi/web/StandardNiFiServiceFacade.java | 109 +++++++++---
.../nifi/web/api/ProcessGroupResource.java | 77 +++++----
.../apache/nifi/web/api/VersionsResource.java | 33 ++--
.../org/apache/nifi/web/api/dto/DtoFactory.java | 38 +----
.../web/dao/impl/StandardProcessGroupDAO.java | 8 +-
.../dao/impl/StandardRemoteProcessGroupDAO.java | 5 +-
.../src/main/resources/nifi-web-api-context.xml | 2 -
19 files changed, 384 insertions(+), 309 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/fe8b30bf/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java
index 944b10a..21864b0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java
@@ -32,8 +32,6 @@ public class VersionControlInformationDTO {
private String flowName;
private String flowDescription;
private Integer version;
- private Boolean modified;
- private Boolean current;
private String state;
private String stateExplanation;
@@ -118,26 +116,6 @@ public class VersionControlInformationDTO {
this.version = version;
}
- @ApiModelProperty(readOnly=true,
- value = "Whether or not the flow has been modified since it was last synced to the Flow Registry. The value will be null if this information is not yet known.")
- public Boolean getModified() {
- return modified;
- }
-
- public void setModified(Boolean modified) {
- this.modified = modified;
- }
-
- @ApiModelProperty(readOnly=true,
- value = "Whether or not this is the most recent version of the flow in the Flow Registry. The value will be null if this information is not yet known.")
- public Boolean getCurrent() {
- return current;
- }
-
- public void setCurrent(Boolean current) {
- this.current = current;
- }
-
@ApiModelProperty(readOnly = true,
value = "The current state of the Process Group, as it relates to the Versioned Flow",
allowableValues = "LOCALLY_MODIFIED_DESCENDANT, LOCALLY_MODIFIED, STALE, LOCALLY_MODIFIED_AND_STALE, UP_TO_DATE")
http://git-wip-us.apache.org/repos/asf/nifi/blob/fe8b30bf/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java
index 457e75b..d2eb749 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java
@@ -55,7 +55,7 @@ public class ProcessGroupEntityMerger implements ComponentEntityMerger<ProcessGr
if (targetVersionControl == null) {
targetGroupDto.setVersionControlInformation(toMergeGroupDto.getVersionControlInformation());
} else if (toMergeVersionControl != null) {
- targetVersionControl.setCurrent(Boolean.TRUE.equals(targetVersionControl.getCurrent()) && Boolean.TRUE.equals(toMergeVersionControl.getCurrent()));
+ VersionControlInformationEntityMerger.updateFlowState(targetVersionControl, toMergeVersionControl);
}
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fe8b30bf/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/VersionControlInformationEntityMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/VersionControlInformationEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/VersionControlInformationEntityMerger.java
index 8d102df..f8ab4c0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/VersionControlInformationEntityMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/VersionControlInformationEntityMerger.java
@@ -20,6 +20,7 @@ package org.apache.nifi.cluster.manager;
import java.util.Map;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.registry.flow.VersionedFlowState;
import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
@@ -37,12 +38,54 @@ public class VersionControlInformationEntityMerger {
.forEach(entity -> {
final VersionControlInformationDTO dto = entity.getVersionControlInformation();
- // We consider the flow to be current only if ALL nodes indicate that it is current
- clientDto.setCurrent(Boolean.TRUE.equals(clientDto.getCurrent()) && Boolean.TRUE.equals(dto.getCurrent()));
-
- // We consider the flow to be modified if ANY node indicates that it is modified
- clientDto.setModified(Boolean.TRUE.equals(clientDto.getModified()) || Boolean.TRUE.equals(dto.getModified()));
+ updateFlowState(clientDto, dto);
});
}
+
+ private static boolean isCurrent(final VersionedFlowState state) {
+ return state == VersionedFlowState.UP_TO_DATE || state == VersionedFlowState.LOCALLY_MODIFIED;
+ }
+
+ private static boolean isModified(final VersionedFlowState state) {
+ return state == VersionedFlowState.LOCALLY_MODIFIED || state == VersionedFlowState.LOCALLY_MODIFIED_AND_STALE;
+ }
+
+ public static void updateFlowState(final VersionControlInformationDTO clientDto, final VersionControlInformationDTO dto) {
+ final VersionedFlowState clientState = VersionedFlowState.valueOf(clientDto.getState());
+ if (clientState == VersionedFlowState.SYNC_FAILURE) {
+ return;
+ }
+
+ final VersionedFlowState dtoState = VersionedFlowState.valueOf(dto.getState());
+ if (dtoState == VersionedFlowState.SYNC_FAILURE) {
+ clientDto.setState(dto.getState());
+ clientDto.setStateExplanation(dto.getStateExplanation());
+ return;
+ }
+
+ final boolean clientCurrent = isCurrent(clientState);
+ final boolean clientModified = isModified(clientState);
+
+ final boolean dtoCurrent = isCurrent(dtoState);
+ final boolean dtoModified = isModified(dtoState);
+
+ final boolean current = clientCurrent && dtoCurrent;
+ final boolean stale = !current;
+ final boolean modified = clientModified && dtoModified;
+
+ final VersionedFlowState flowState;
+ if (modified && stale) {
+ flowState = VersionedFlowState.LOCALLY_MODIFIED_AND_STALE;
+ } else if (modified) {
+ flowState = VersionedFlowState.LOCALLY_MODIFIED;
+ } else if (stale) {
+ flowState = VersionedFlowState.STALE;
+ } else {
+ flowState = VersionedFlowState.UP_TO_DATE;
+ }
+
+ clientDto.setState(flowState.name());
+ clientDto.setStateExplanation(flowState.getDescription());
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fe8b30bf/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java
index 1f65a19..bb4e0d0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java
@@ -66,17 +66,6 @@ public interface VersionControlInformation {
int getVersion();
/**
- * @return <code>true</code> if the flow has been modified since the last time that it was updated from the Flow Registry or saved
- * to the Flow Registry; <code>false</code> if the flow is in sync with the Flow Registry.
- */
- boolean isModified();
-
- /**
- * @return <code>true</code> if this version of the flow is the most recent version of the flow available in the Flow Registry, <code>false</code> otherwise.
- */
- boolean isCurrent();
-
- /**
* @return the current status of the Process Group as it relates to the associated Versioned Flow.
*/
VersionedFlowStatus getStatus();
http://git-wip-us.apache.org/repos/asf/nifi/blob/fe8b30bf/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowState.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowState.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowState.java
index d20a13f..35b436d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowState.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowState.java
@@ -22,31 +22,42 @@ public enum VersionedFlowState {
/**
* We are unable to communicate with the Flow Registry in order to determine the appropriate state
*/
- SYNC_FAILURE,
+ SYNC_FAILURE("Failed to communicate with Flow Registry"),
/**
* This Process Group (or a child/descendant Process Group that is not itself under Version Control)
* is on the latest version of the Versioned Flow, but is different than the Versioned Flow that is
* stored in the Flow Registry.
*/
- LOCALLY_MODIFIED,
+ LOCALLY_MODIFIED("Local changes have been made"),
/**
* This Process Group has not been modified since it was last synchronized with the Flow Registry, but
* the Flow Registry has a newer version of the flow than what is contained in this Process Group.
*/
- STALE,
+ STALE("A newer version of this flow is available"),
/**
* This Process Group (or a child/descendant Process Group that is not itself under Version Control)
* has been modified since it was last synchronized with the Flow Registry, and the Flow Registry has
* a newer version of the flow than what is contained in this Process Group.
*/
- LOCALLY_MODIFIED_AND_STALE,
+ LOCALLY_MODIFIED_AND_STALE("Local changes have been made and a newer version of this flow is available"),
/**
* This Process Group and all child/descendant Process Groups are on the latest version of the flow in
* the Flow Registry and have no local modifications.
*/
- UP_TO_DATE;
+ UP_TO_DATE("Flow version is current");
+
+
+ private final String description;
+
+ private VersionedFlowState(final String description) {
+ this.description = description;
+ }
+
+ public String getDescription() {
+ return description;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fe8b30bf/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 9cbf323..9bb3d2f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -88,6 +88,7 @@ import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.StandardVersionControlInformation;
+import org.apache.nifi.registry.flow.VersionedFlowState;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
@@ -1116,10 +1117,10 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
final FlowRegistry flowRegistry = controller.getFlowRegistryClient().getFlowRegistry(versionControlInfoDto.getRegistryId());
final String registryName = flowRegistry == null ? versionControlInfoDto.getRegistryId() : flowRegistry.getName();
+ versionControlInfoDto.setState(VersionedFlowState.SYNC_FAILURE.name());
+ versionControlInfoDto.setStateExplanation("Process Group has not yet been synchronized with the Flow Registry");
final StandardVersionControlInformation versionControlInformation = StandardVersionControlInformation.Builder.fromDto(versionControlInfoDto)
.registryName(registryName)
- .modified(false)
- .current(true)
.build();
// pass empty map for the version control mapping because the VersionedComponentId has already been set on the components
http://git-wip-us.apache.org/repos/asf/nifi/blob/fe8b30bf/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index fb3d3a6..7d184df 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -169,8 +169,7 @@ public final class StandardProcessGroup implements ProcessGroup {
private final Map<String, Template> templates = new HashMap<>();
private final StringEncryptor encryptor;
private final MutableVariableRegistry variableRegistry;
- private final AtomicReference<StandardVersionedFlowStatus> flowStatus = new AtomicReference<>(
- new StandardVersionedFlowStatus(null, "Not yet synchronized with Flow Registry", null));
+ private final VersionControlFields versionControlFields = new VersionControlFields();
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
@@ -339,14 +338,22 @@ public final class StandardProcessGroup implements ProcessGroup {
// update the vci counts for this child group
final VersionControlInformation vci = childGroup.getVersionControlInformation();
if (vci != null) {
- if (vci.isModified() && !vci.isCurrent()) {
- locallyModifiedAndStale += 1;
- } else if (!vci.isCurrent()) {
- stale += 1;
- } else if (vci.isModified()) {
- locallyModified += 1;
- } else {
- upToDate += 1;
+ switch (vci.getStatus().getState()) {
+ case LOCALLY_MODIFIED:
+ locallyModified++;
+ break;
+ case LOCALLY_MODIFIED_AND_STALE:
+ locallyModifiedAndStale++;
+ break;
+ case STALE:
+ stale++;
+ break;
+ case SYNC_FAILURE:
+ syncFailure++;
+ break;
+ case UP_TO_DATE:
+ upToDate++;
+ break;
}
}
@@ -2938,17 +2945,9 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
- clearFlowDifferences();
+ versionControlFields.setFlowDifferences(null);
}
- private void clearFlowDifferences() {
- boolean updated = false;
- while (!updated) {
- final StandardVersionedFlowStatus status = flowStatus.get();
- final StandardVersionedFlowStatus updatedStatus = new StandardVersionedFlowStatus(status.getState(), status.getStateExplanation(), null);
- updated = flowStatus.compareAndSet(status, updatedStatus);
- }
- }
@Override
public void setVersionControlInformation(final VersionControlInformation versionControlInformation, final Map<String, String> versionedComponentIds) {
@@ -2959,8 +2958,6 @@ public final class StandardProcessGroup implements ProcessGroup {
versionControlInformation.getFlowIdentifier(),
versionControlInformation.getVersion(),
stripContentsFromRemoteDescendantGroups(versionControlInformation.getFlowSnapshot(), true),
- versionControlInformation.isModified(),
- versionControlInformation.isCurrent(),
versionControlInformation.getStatus()) {
@Override
@@ -2970,60 +2967,50 @@ public final class StandardProcessGroup implements ProcessGroup {
return registry == null ? registryId : registry.getName();
}
- @Override
- public boolean isModified() {
- boolean updated = false;
- while (true) {
- final StandardVersionedFlowStatus status = flowStatus.get();
- Set<FlowDifference> differences = status.getCurrentDifferences();
+ private boolean isModified() {
+ Set<FlowDifference> differences = versionControlFields.getFlowDifferences();
+ if (differences == null) {
+ differences = getModifications();
if (differences == null) {
- differences = getModifications();
- if (differences == null) {
- return false;
- }
-
- final StandardVersionedFlowStatus updatedStatus = new StandardVersionedFlowStatus(status.getState(), status.getStateExplanation(), differences);
- updated = flowStatus.compareAndSet(status, updatedStatus);
-
- if (updated) {
- return !differences.isEmpty();
- }
-
- continue;
+ return false;
}
- return !differences.isEmpty();
+ versionControlFields.setFlowDifferences(differences);
}
+
+ return !differences.isEmpty();
}
@Override
public VersionedFlowStatus getStatus() {
// If current state is a sync failure, then
- final StandardVersionedFlowStatus status = flowStatus.get();
- final VersionedFlowState state = status.getState();
- if (state == VersionedFlowState.SYNC_FAILURE) {
- return status;
+ final String syncFailureExplanation = versionControlFields.getSyncFailureExplanation();
+ if (syncFailureExplanation != null) {
+ return new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, syncFailureExplanation);
}
final boolean modified = isModified();
if (!modified) {
final VersionControlInformation vci = StandardProcessGroup.this.versionControlInfo.get();
if (vci.getFlowSnapshot() == null) {
- return new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, "Process Group has not yet been synchronized with Flow Registry", null);
+ return new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, "Process Group has not yet been synchronized with Flow Registry");
}
}
- final boolean stale = !isCurrent();
+ final boolean stale = versionControlFields.isStale();
+ final VersionedFlowState flowState;
if (modified && stale) {
- return new StandardVersionedFlowStatus(VersionedFlowState.LOCALLY_MODIFIED_AND_STALE, "Local changes have been made and a newer version of this flow is available", null);
+ flowState = VersionedFlowState.LOCALLY_MODIFIED_AND_STALE;
} else if (modified) {
- return new StandardVersionedFlowStatus(VersionedFlowState.LOCALLY_MODIFIED, "Local changes have been made", null);
+ flowState = VersionedFlowState.LOCALLY_MODIFIED;
} else if (stale) {
- return new StandardVersionedFlowStatus(VersionedFlowState.STALE, "A newer version of this flow is available", null);
+ flowState = VersionedFlowState.STALE;
} else {
- return new StandardVersionedFlowStatus(VersionedFlowState.UP_TO_DATE, "Flow version is current", null);
+ flowState = VersionedFlowState.UP_TO_DATE;
}
+
+ return new StandardVersionedFlowStatus(flowState, flowState.getDescription());
}
};
@@ -3031,11 +3018,23 @@ public final class StandardProcessGroup implements ProcessGroup {
svci.setFlowName(versionControlInformation.getFlowName());
svci.setFlowDescription(versionControlInformation.getFlowDescription());
+ final VersionedFlowState flowState = versionControlInformation.getStatus().getState();
+ versionControlFields.setStale(flowState == VersionedFlowState.STALE || flowState == VersionedFlowState.LOCALLY_MODIFIED_AND_STALE);
+ versionControlFields.setLocallyModified(flowState == VersionedFlowState.LOCALLY_MODIFIED || flowState == VersionedFlowState.LOCALLY_MODIFIED_AND_STALE);
+ versionControlFields.setSyncFailureExplanation(null);
+
writeLock.lock();
try {
updateVersionedComponentIds(this, versionedComponentIds);
this.versionControlInfo.set(svci);
- clearFlowDifferences();
+ versionControlFields.setFlowDifferences(null);
+
+ final ProcessGroup parent = getParent();
+ if (parent != null) {
+ parent.onComponentModified();
+ }
+
+ scheduler.submitFrameworkTask(() -> synchronizeWithFlowRegistry(flowController.getFlowRegistryClient()));
} finally {
writeLock.unlock();
}
@@ -3156,14 +3155,6 @@ public final class StandardProcessGroup implements ProcessGroup {
.forEach(childGroup -> applyVersionedComponentIds(childGroup, lookup));
}
- private void setSyncFailedState(final String explanation) {
- boolean updated = false;
- while (!updated) {
- final StandardVersionedFlowStatus status = flowStatus.get();
- final StandardVersionedFlowStatus updatedStatus = new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, explanation, status.getCurrentDifferences());
- updated = flowStatus.compareAndSet(status, updatedStatus);
- }
- }
@Override
public void synchronizeWithFlowRegistry(final FlowRegistryClient flowRegistryClient) {
@@ -3177,7 +3168,7 @@ public final class StandardProcessGroup implements ProcessGroup {
if (flowRegistry == null) {
final String message = String.format("Unable to synchronize Process Group with Flow Registry because Process Group was placed under Version Control using Flow Registry "
+ "with identifier %s but cannot find any Flow Registry with this identifier", registryId);
- setSyncFailedState(message);
+ versionControlFields.setSyncFailureExplanation(message);
LOG.error("Unable to synchronize {} with Flow Registry because Process Group was placed under Version Control using Flow Registry "
+ "with identifier {} but cannot find any Flow Registry with this identifier", this, registryId);
@@ -3195,7 +3186,7 @@ public final class StandardProcessGroup implements ProcessGroup {
} catch (final IOException | NiFiRegistryException e) {
final String message = String.format("Failed to synchronize Process Group with Flow Registry because could not retrieve version %s of flow with identifier %s in bucket %s",
vci.getVersion(), vci.getFlowIdentifier(), vci.getBucketIdentifier());
- setSyncFailedState(message);
+ versionControlFields.setSyncFailureExplanation(message);
LOG.error("Failed to synchronize {} with Flow Registry because could not retrieve version {} of flow with identifier {} in bucket {}",
this, vci.getVersion(), vci.getFlowIdentifier(), vci.getBucketIdentifier(), e);
@@ -3213,22 +3204,17 @@ public final class StandardProcessGroup implements ProcessGroup {
if (latestVersion == vci.getVersion()) {
LOG.debug("{} is currently at the most recent version ({}) of the flow that is under Version Control", this, latestVersion);
- vci.setCurrent(true);
+ versionControlFields.setStale(false);
} else {
- vci.setCurrent(false);
LOG.info("{} is not the most recent version of the flow that is under Version Control; current version is {}; most recent version is {}",
new Object[] {this, vci.getVersion(), latestVersion});
+ versionControlFields.setStale(true);
}
- boolean updated = false;
- while (!updated) {
- final StandardVersionedFlowStatus status = flowStatus.get();
- final StandardVersionedFlowStatus updatedStatus = new StandardVersionedFlowStatus(null, null, status.getCurrentDifferences());
- updated = flowStatus.compareAndSet(status, updatedStatus);
- }
+ versionControlFields.setSyncFailureExplanation(null);
} catch (final IOException | NiFiRegistryException e) {
final String message = String.format("Failed to synchronize Process Group with Flow Registry : " + e.getMessage());
- setSyncFailedState(message);
+ versionControlFields.setSyncFailureExplanation(message);
LOG.error("Failed to synchronize {} with Flow Registry because could not determine the most recent version of the Flow in the Flow Registry", this, e);
}
@@ -3253,10 +3239,6 @@ public final class StandardProcessGroup implements ProcessGroup {
final Set<String> updatedVersionedComponentIds = new HashSet<>();
for (final FlowDifference diff : flowComparison.getDifferences()) {
- if (diff.getDifferenceType() == DifferenceType.POSITION_CHANGED) {
- continue;
- }
-
// If this update adds a new Controller Service, then we need to check if the service already exists at a higher level
// and if so compare our VersionedControllerService to the existing service.
if (diff.getDifferenceType() == DifferenceType.COMPONENT_ADDED) {
@@ -3393,6 +3375,8 @@ public final class StandardProcessGroup implements ProcessGroup {
final FlowRegistry flowRegistry = flowController.getFlowRegistryClient().getFlowRegistry(registryId);
final String registryName = flowRegistry == null ? registryId : flowRegistry.getName();
+ final VersionedFlowState flowState = remoteCoordinates.getLatest() ? VersionedFlowState.UP_TO_DATE : VersionedFlowState.STALE;
+
final VersionControlInformation vci = new StandardVersionControlInformation.Builder()
.registryId(registryId)
.registryName(registryName)
@@ -3402,8 +3386,7 @@ public final class StandardProcessGroup implements ProcessGroup {
.flowName(flowId)
.version(version)
.flowSnapshot(proposed)
- .modified(false)
- .current(remoteCoordinates.getLatest())
+ .status(new StandardVersionedFlowStatus(flowState, flowState.getDescription()))
.build();
group.setVersionControlInformation(vci, Collections.emptyMap());
@@ -4149,13 +4132,9 @@ public final class StandardProcessGroup implements ProcessGroup {
final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, getAncestorGroupServiceIds(), new EvolvingDifferenceDescriptor());
final FlowComparison comparison = flowComparator.compare();
final Set<FlowDifference> differences = comparison.getDifferences();
- final Set<FlowDifference> functionalDifferences = differences.stream()
- .filter(diff -> diff.getDifferenceType() != DifferenceType.POSITION_CHANGED)
- .filter(diff -> diff.getDifferenceType() != DifferenceType.STYLE_CHANGED)
- .collect(Collectors.toSet());
LOG.debug("There are {} differences between this Local Flow and the Versioned Flow: {}", differences.size(), differences);
- return functionalDifferences;
+ return differences;
}
@@ -4170,7 +4149,8 @@ public final class StandardProcessGroup implements ProcessGroup {
}
if (verifyNotDirty) {
- final boolean modified = versionControlInfo.isModified();
+ final VersionedFlowState flowState = versionControlInfo.getStatus().getState();
+ final boolean modified = flowState == VersionedFlowState.LOCALLY_MODIFIED || flowState == VersionedFlowState.LOCALLY_MODIFIED_AND_STALE;
final Set<FlowDifference> modifications = getModifications();
@@ -4186,7 +4166,7 @@ public final class StandardProcessGroup implements ProcessGroup {
throw new IllegalStateException("Cannot change the Version of the flow for " + this
+ " because the Process Group has been modified (" + modifications.size()
+ " modifications) since it was last synchronized with the Flow Registry. The Process Group must be"
- + " reverted to its original form before changing the version. See logs for more information on what has changed.");
+ + " reverted to its original form before changing the version.");
}
}
}
@@ -4393,8 +4373,8 @@ public final class StandardProcessGroup implements ProcessGroup {
if (flowId != null && flowId.equals(vci.getFlowIdentifier())) {
// Flow ID is the same. We want to publish the Process Group as the next version of the Flow.
// In order to do this, we have to ensure that the Process Group is 'current'.
- final boolean current = vci.isCurrent();
- if (!current) {
+ final VersionedFlowState state = vci.getStatus().getState();
+ if (state == VersionedFlowState.STALE || state == VersionedFlowState.LOCALLY_MODIFIED_AND_STALE) {
throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + getIdentifier()
+ " because the Process Group in the flow is not synchronized with the most recent version of the Flow in the Flow Registry. "
+ "In order to publish a new version of the Flow, the Process Group must first be in synch with the latest version in the Flow Registry.");
@@ -4441,10 +4421,15 @@ public final class StandardProcessGroup implements ProcessGroup {
private void verifyNoDescendantsWithLocalModifications(final String action) {
for (final ProcessGroup descendant : findAllProcessGroups()) {
final VersionControlInformation descendantVci = descendant.getVersionControlInformation();
- if (descendantVci != null && descendantVci.isModified()) {
- throw new IllegalStateException("Process Group cannot " + action + " because it contains a child or descendant Process Group that is under Version Control and "
- + "has local modifications. Each descendant Process Group that is under Version Control must first be reverted or have its changes pushed to the Flow Registry before "
- + "this action can be performed on the parent Process Group.");
+ if (descendantVci != null) {
+ final VersionedFlowState flowState = descendantVci.getStatus().getState();
+ final boolean modified = flowState == VersionedFlowState.LOCALLY_MODIFIED || flowState == VersionedFlowState.LOCALLY_MODIFIED_AND_STALE;
+
+ if (modified) {
+ throw new IllegalStateException("Process Group cannot " + action + " because it contains a child or descendant Process Group that is under Version Control and "
+ + "has local modifications. Each descendant Process Group that is under Version Control must first be reverted or have its changes pushed to the Flow Registry before "
+ + "this action can be performed on the parent Process Group.");
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fe8b30bf/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardVersionedFlowStatus.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardVersionedFlowStatus.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardVersionedFlowStatus.java
index f362c1e..4be9898 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardVersionedFlowStatus.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardVersionedFlowStatus.java
@@ -17,21 +17,16 @@
package org.apache.nifi.groups;
-import java.util.Set;
-
import org.apache.nifi.registry.flow.VersionedFlowState;
import org.apache.nifi.registry.flow.VersionedFlowStatus;
-import org.apache.nifi.registry.flow.diff.FlowDifference;
class StandardVersionedFlowStatus implements VersionedFlowStatus {
private final VersionedFlowState state;
private final String explanation;
- private final Set<FlowDifference> currentDifferences;
- StandardVersionedFlowStatus(final VersionedFlowState state, final String explanation, final Set<FlowDifference> differences) {
+ StandardVersionedFlowStatus(final VersionedFlowState state, final String explanation) {
this.state = state;
this.explanation = explanation;
- this.currentDifferences = differences;
}
@Override
@@ -43,8 +38,4 @@ class StandardVersionedFlowStatus implements VersionedFlowStatus {
public String getStateExplanation() {
return explanation;
}
-
- Set<FlowDifference> getCurrentDifferences() {
- return currentDifferences;
- }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fe8b30bf/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/VersionControlFields.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/VersionControlFields.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/VersionControlFields.java
new file mode 100644
index 0000000..50c640b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/VersionControlFields.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.groups;
+
+import java.util.Set;
+
+import org.apache.nifi.registry.flow.diff.FlowDifference;
+
+public class VersionControlFields {
+ private volatile boolean locallyModified;
+ private volatile boolean stale;
+ private volatile String syncFailureExplanation = "Not yet synchronized with Flow Registry";
+ private volatile Set<FlowDifference> flowDifferences;
+
+ boolean isLocallyModified() {
+ return locallyModified;
+ }
+
+ void setLocallyModified(final boolean locallyModified) {
+ this.locallyModified = locallyModified;
+ }
+
+ boolean isStale() {
+ return stale;
+ }
+
+ void setStale(final boolean stale) {
+ this.stale = stale;
+ }
+
+ String getSyncFailureExplanation() {
+ return syncFailureExplanation;
+ }
+
+ void setSyncFailureExplanation(final String syncFailureExplanation) {
+ this.syncFailureExplanation = syncFailureExplanation;
+ }
+
+ Set<FlowDifference> getFlowDifferences() {
+ return flowDifferences;
+ }
+
+ void setFlowDifferences(final Set<FlowDifference> flowDifferences) {
+ this.flowDifferences = flowDifferences;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fe8b30bf/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java
index 106d19a..feef0e0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java
@@ -32,8 +32,6 @@ public class StandardVersionControlInformation implements VersionControlInformat
private volatile String flowDescription;
private final int version;
private volatile VersionedProcessGroup flowSnapshot;
- private volatile boolean modified;
- private volatile boolean current;
private final VersionedFlowStatus status;
public static class Builder {
@@ -46,8 +44,6 @@ public class StandardVersionControlInformation implements VersionControlInformat
private String flowDescription;
private int version;
private VersionedProcessGroup flowSnapshot;
- private Boolean modified = null;
- private Boolean current = null;
private VersionedFlowStatus status;
public Builder registryId(String registryId) {
@@ -90,16 +86,6 @@ public class StandardVersionControlInformation implements VersionControlInformat
return this;
}
- public Builder modified(boolean modified) {
- this.modified = modified;
- return this;
- }
-
- public Builder current(boolean current) {
- this.current = current;
- return this;
- }
-
public Builder flowSnapshot(VersionedProcessGroup snapshot) {
this.flowSnapshot = snapshot;
return this;
@@ -119,8 +105,17 @@ public class StandardVersionControlInformation implements VersionControlInformat
.flowId(dto.getFlowId())
.flowName(dto.getFlowName())
.flowDescription(dto.getFlowDescription())
- .current(dto.getCurrent() == null ? true : dto.getCurrent())
- .modified(dto.getModified() == null ? false : dto.getModified())
+ .status(new VersionedFlowStatus() {
+ @Override
+ public VersionedFlowState getState() {
+ return VersionedFlowState.valueOf(dto.getState());
+ }
+
+ @Override
+ public String getStateExplanation() {
+ return dto.getStateExplanation();
+ }
+ })
.version(dto.getVersion());
return builder;
@@ -133,7 +128,7 @@ public class StandardVersionControlInformation implements VersionControlInformat
Objects.requireNonNull(version, "Version must be specified");
final StandardVersionControlInformation svci = new StandardVersionControlInformation(registryIdentifier, registryName,
- bucketIdentifier, flowIdentifier, version, flowSnapshot, modified, current, status);
+ bucketIdentifier, flowIdentifier, version, flowSnapshot, status);
svci.setBucketName(bucketName);
svci.setFlowName(flowName);
@@ -145,15 +140,13 @@ public class StandardVersionControlInformation implements VersionControlInformat
public StandardVersionControlInformation(final String registryId, final String registryName, final String bucketId, final String flowId, final int version,
- final VersionedProcessGroup snapshot, final boolean modified, final boolean current, final VersionedFlowStatus status) {
+ final VersionedProcessGroup snapshot, final VersionedFlowStatus status) {
this.registryIdentifier = registryId;
this.registryName = registryName;
this.bucketIdentifier = bucketId;
this.flowIdentifier = flowId;
this.version = version;
this.flowSnapshot = snapshot;
- this.modified = modified;
- this.current = current;
this.status = status;
}
@@ -215,28 +208,10 @@ public class StandardVersionControlInformation implements VersionControlInformat
}
@Override
- public boolean isModified() {
- return modified;
- }
-
- @Override
- public boolean isCurrent() {
- return current;
- }
-
- @Override
public VersionedProcessGroup getFlowSnapshot() {
return flowSnapshot;
}
- public void setModified(final boolean modified) {
- this.modified = modified;
- }
-
- public void setCurrent(final boolean current) {
- this.current = current;
- }
-
public void setFlowSnapshot(final VersionedProcessGroup flowSnapshot) {
this.flowSnapshot = flowSnapshot;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fe8b30bf/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index 5808500..0b9c6f2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -178,9 +178,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
};
- final Runnable checkAuthorizations = new InitializationTask();
backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id, true);
- backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 30L, 30L, TimeUnit.SECONDS);
}
@Override
@@ -197,6 +195,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
logger.warn("Unable to communicate with remote instance {}", new Object[] {this, e});
}
});
+
+ final Runnable checkAuthorizations = new InitializationTask();
+ backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 0L, 60L, TimeUnit.SECONDS);
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/fe8b30bf/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 78335f4..514cd18 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -1442,20 +1442,6 @@ public interface NiFiServiceFacade {
*/
void verifyCanRevertLocalModifications(String groupId, VersionedFlowSnapshot versionedFlowSnapshot);
- /**
- * Updates the Process group with the given ID to match the new snapshot
- *
- * @param revision the revision of the Process Group
- * @param groupId the ID of the Process Group
- * @param versionControlInfo the Version Control information
- * @param snapshot the new snapshot
- * @param componentIdSeed the seed to use for generating new component ID's
- * @param updateDescendantVersionedFlows if a child/descendant Process Group is under Version Control, specifies whether or not to
- * update the contents of that Process Group
- * @return the Process Group
- */
- ProcessGroupEntity updateProcessGroup(Revision revision, String groupId, VersionControlInformationDTO versionControlInfo, VersionedFlowSnapshot snapshot, String componentIdSeed,
- boolean verifyNotModified, boolean updateDescendantVersionedFlows);
/**
* Updates the Process group with the given ID to match the new snapshot
http://git-wip-us.apache.org/repos/asf/nifi/blob/fe8b30bf/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index ae89ef0..a2d6e41 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -97,6 +97,7 @@ import org.apache.nifi.registry.flow.VersionedConnection;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.registry.flow.VersionedFlowState;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
import org.apache.nifi.registry.flow.diff.ConciseEvolvingDifferenceDescriptor;
@@ -292,6 +293,7 @@ import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* Implementation of NiFiServiceFacade that performs revision checking.
@@ -1592,7 +1594,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final D dto = dtoCreation.apply(component);
final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
- return new StandardRevisionUpdate<D>(dto, lastMod);
+ return new StandardRevisionUpdate<>(dto, lastMod);
});
}
@@ -1779,7 +1781,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
controllerFacade.save();
final SnippetDTO dto = dtoFactory.createSnippetDto(snippet);
- final RevisionUpdate<SnippetDTO> snapshot = new StandardRevisionUpdate<SnippetDTO>(dto, null);
+ final RevisionUpdate<SnippetDTO> snapshot = new StandardRevisionUpdate<>(dto, null);
return entityFactory.createSnippetEntity(snapshot.getComponent());
}
@@ -2088,7 +2090,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
controllerFacade.save();
final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
- return new StandardRevisionUpdate<ControllerServiceDTO>(dto, lastMod);
+ return new StandardRevisionUpdate<>(dto, lastMod);
});
} else {
snapshot = revisionManager.updateRevision(claim, user, () -> {
@@ -2098,7 +2100,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
controllerFacade.save();
final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
- return new StandardRevisionUpdate<ControllerServiceDTO>(dto, lastMod);
+ return new StandardRevisionUpdate<>(dto, lastMod);
});
}
@@ -2440,7 +2442,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final Revision updatedRevision = revisionManager.getRevision(revision.getComponentId()).incrementRevision(revision.getClientId());
final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity());
- return new StandardRevisionUpdate<FlowRegistry>(registry, lastModification);
+ return new StandardRevisionUpdate<>(registry, lastModification);
});
final FlowRegistry updatedReg = revisionUpdate.getComponent();
@@ -2483,7 +2485,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final ReportingTaskDTO dto = dtoFactory.createReportingTaskDto(reportingTask);
final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
- return new StandardRevisionUpdate<ReportingTaskDTO>(dto, lastMod);
+ return new StandardRevisionUpdate<>(dto, lastMod);
});
final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskDTO.getId());
@@ -3649,6 +3651,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public VersionControlComponentMappingEntity registerFlowWithFlowRegistry(final String groupId, final StartVersionControlRequestEntity requestEntity) {
final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
+
final VersionControlInformation currentVci = processGroup.getVersionControlInformation();
final int expectedVersion = currentVci == null ? 1 : currentVci.getVersion() + 1;
@@ -3697,15 +3700,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final VersionControlInformationDTO vci = new VersionControlInformationDTO();
vci.setBucketId(bucket.getIdentifier());
vci.setBucketName(bucket.getName());
- vci.setCurrent(true);
vci.setFlowId(flow.getIdentifier());
vci.setFlowName(flow.getName());
vci.setFlowDescription(flow.getDescription());
vci.setGroupId(groupId);
- vci.setModified(false);
vci.setRegistryId(registryId);
vci.setRegistryName(getFlowRegistryName(registryId));
vci.setVersion(registeredSnapshot.getSnapshotMetadata().getVersion());
+ vci.setState(VersionedFlowState.UP_TO_DATE.name());
final Map<String, String> mapping = dtoFactory.createVersionControlComponentMappingDto(versionedProcessGroup);
@@ -3777,8 +3779,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, ancestorServiceIds, new ConciseEvolvingDifferenceDescriptor());
final FlowComparison flowComparison = flowComparator.compare();
- final Set<ComponentDifferenceDTO> differenceDtos = dtoFactory.createComponentDifferenceDtos(flowComparison,
- diff -> diff.getDifferenceType() != DifferenceType.POSITION_CHANGED && diff.getDifferenceType() != DifferenceType.STYLE_CHANGED);
+ final Set<ComponentDifferenceDTO> differenceDtos = dtoFactory.createComponentDifferenceDtos(flowComparison);
final FlowComparisonEntity entity = new FlowComparisonEntity();
entity.setComponentDifferences(differenceDtos);
@@ -4079,30 +4080,88 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return flowRegistry == null ? flowRegistryId : flowRegistry.getName();
}
- @Override
- public ProcessGroupEntity updateProcessGroup(final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo,
- final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified, final boolean updateDescendantVersionedFlows) {
+ private List<Revision> getComponentRevisions(final ProcessGroup processGroup, final boolean includeGroupRevision) {
+ final List<Revision> revisions = new ArrayList<>();
+ if (includeGroupRevision) {
+ revisions.add(revisionManager.getRevision(processGroup.getIdentifier()));
+ }
- final NiFiUser user = NiFiUserUtils.getNiFiUser();
- return updateProcessGroupContents(user, revision, groupId, versionControlInfo, proposedFlowSnapshot, componentIdSeed, verifyNotModified, true, updateDescendantVersionedFlows);
+ processGroup.findAllConnections().stream()
+ .map(component -> revisionManager.getRevision(component.getIdentifier()))
+ .forEach(revisions::add);
+ processGroup.findAllControllerServices().stream()
+ .map(component -> revisionManager.getRevision(component.getIdentifier()))
+ .forEach(revisions::add);
+ processGroup.findAllFunnels().stream()
+ .map(component -> revisionManager.getRevision(component.getIdentifier()))
+ .forEach(revisions::add);
+ processGroup.findAllInputPorts().stream()
+ .map(component -> revisionManager.getRevision(component.getIdentifier()))
+ .forEach(revisions::add);
+ processGroup.findAllOutputPorts().stream()
+ .map(component -> revisionManager.getRevision(component.getIdentifier()))
+ .forEach(revisions::add);
+ processGroup.findAllLabels().stream()
+ .map(component -> revisionManager.getRevision(component.getIdentifier()))
+ .forEach(revisions::add);
+ processGroup.findAllProcessGroups().stream()
+ .map(component -> revisionManager.getRevision(component.getIdentifier()))
+ .forEach(revisions::add);
+ processGroup.findAllProcessors().stream()
+ .map(component -> revisionManager.getRevision(component.getIdentifier()))
+ .forEach(revisions::add);
+ processGroup.findAllRemoteProcessGroups().stream()
+ .map(component -> revisionManager.getRevision(component.getIdentifier()))
+ .forEach(revisions::add);
+ processGroup.findAllRemoteProcessGroups().stream()
+ .flatMap(rpg -> Stream.concat(rpg.getInputPorts().stream(), rpg.getOutputPorts().stream()))
+ .map(component -> revisionManager.getRevision(component.getIdentifier()))
+ .forEach(revisions::add);
+
+ return revisions;
}
@Override
public ProcessGroupEntity updateProcessGroupContents(final NiFiUser user, final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo,
final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings, final boolean updateDescendantVersionedFlows) {
- final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(groupId);
- final RevisionUpdate<ProcessGroupDTO> snapshot = updateComponent(user, revision,
- processGroupNode,
- () -> processGroupDAO.updateProcessGroupFlow(groupId, user, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified, updateSettings, updateDescendantVersionedFlows),
- processGroup -> dtoFactory.createProcessGroupDto(processGroup));
+ final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
+ final List<Revision> revisions = getComponentRevisions(processGroup, false);
+ revisions.add(revision);
- final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode);
- final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification());
- final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroupNode.getIdentifier()));
- final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroupNode.getIdentifier()));
+ final RevisionClaim revisionClaim = new StandardRevisionClaim(revisions);
+
+ final RevisionUpdate<ProcessGroupDTO> revisionUpdate = revisionManager.updateRevision(revisionClaim, user, new UpdateRevisionTask<ProcessGroupDTO>() {
+ @Override
+ public RevisionUpdate<ProcessGroupDTO> update() {
+ // update the Process Group
+ processGroupDAO.updateProcessGroupFlow(groupId, user, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified, updateSettings, updateDescendantVersionedFlows);
+
+ // update the revisions
+ final Set<Revision> updatedRevisions = revisions.stream()
+ .map(rev -> revisionManager.getRevision(rev.getComponentId()).incrementRevision(revision.getClientId()))
+ .collect(Collectors.toSet());
+
+ // save
+ controllerFacade.save();
+
+ // gather details for response
+ final ProcessGroupDTO dto = dtoFactory.createProcessGroupDto(processGroup);
+
+ final Revision updatedRevision = revisionManager.getRevision(groupId).incrementRevision(revision.getClientId());
+ final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity());
+ return new StandardRevisionUpdate<>(dto, lastModification, updatedRevisions);
+ }
+ });
+
+ final FlowModification lastModification = revisionUpdate.getLastModification();
+
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
+ final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(lastModification);
+ final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroup.getIdentifier()));
+ final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroup.getIdentifier()));
final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
- return entityFactory.createProcessGroupEntity(snapshot.getComponent(), updatedRevision, permissions, status, bulletinEntities);
+ return entityFactory.createProcessGroupEntity(revisionUpdate.getComponent(), updatedRevision, permissions, status, bulletinEntities);
}
private AuthorizationResult authorizeAction(final Action action) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/fe8b30bf/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index a3bb5b2..b3ccefb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -16,12 +16,32 @@
*/
package org.apache.nifi.web.api;
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiParam;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
-import io.swagger.annotations.Authorization;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedHashMap;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.core.UriInfo;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.stream.XMLStreamReader;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AuthorizableLookup;
import org.apache.nifi.authorization.AuthorizeAccess;
@@ -47,6 +67,7 @@ import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.FlowRegistryUtils;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowState;
import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest;
import org.apache.nifi.registry.variable.VariableRegistryUpdateStep;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
@@ -111,31 +132,6 @@ import org.slf4j.LoggerFactory;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedHashMap;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
-import javax.ws.rs.core.UriBuilder;
-import javax.ws.rs.core.UriInfo;
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.stream.XMLStreamReader;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
@@ -161,6 +157,13 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+
/**
* RESTful endpoint for managing a Group.
*/
@@ -1657,8 +1660,8 @@ public class ProcessGroupResource extends ApplicationResource {
versionControlInfo.setFlowDescription(flow.getDescription());
versionControlInfo.setRegistryName(serviceFacade.getFlowRegistryName(versionControlInfo.getRegistryId()));
- versionControlInfo.setModified(false);
- versionControlInfo.setCurrent(flowSnapshot.isLatest());
+ final VersionedFlowState flowState = flowSnapshot.isLatest() ? VersionedFlowState.UP_TO_DATE : VersionedFlowState.STALE;
+ versionControlInfo.setState(flowState.name());
// Step 3: Resolve Bundle info
BundleUtils.discoverCompatibleBundles(flowSnapshot.getFlowContents());
@@ -1689,8 +1692,12 @@ public class ProcessGroupResource extends ApplicationResource {
}
}
},
- () -> {
- },
+ () -> {
+ final VersionedFlowSnapshot versionedFlowSnapshot = requestProcessGroupEntity.getVersionedFlowSnapshot();
+ if (versionedFlowSnapshot != null) {
+ serviceFacade.verifyComponentTypes(versionedFlowSnapshot.getFlowContents());
+ }
+ },
processGroupEntity -> {
final ProcessGroupDTO processGroup = processGroupEntity.getComponent();
http://git-wip-us.apache.org/repos/asf/nifi/blob/fe8b30bf/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
index 5dc7325..950bd97 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
@@ -40,6 +40,7 @@ import org.apache.nifi.registry.flow.FlowRegistryUtils;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.registry.flow.VersionedFlowState;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.util.BundleUtils;
import org.apache.nifi.web.NiFiServiceFacade;
@@ -166,14 +167,14 @@ public class VersionsResource extends ApplicationResource {
@POST
- @Consumes(MediaType.WILDCARD)
- @Produces(MediaType.APPLICATION_JSON)
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.TEXT_PLAIN)
@Path("active-requests")
@ApiOperation(
value = "Creates a request so that a Process Group can be placed under Version Control or have its Version Control configuration changed. Creating this request will "
+ "prevent any other threads from simultaneously saving local changes to Version Control. It will not, however, actually save the local flow to the Flow Registry. A "
+ "POST to /versions/process-groups/{id} should be used to initiate saving of the local flow to the Flow Registry.",
- response = VersionControlInformationEntity.class,
+ response = String.class,
notes = NON_GUARANTEED_ENDPOINT)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@@ -186,7 +187,7 @@ public class VersionsResource extends ApplicationResource {
@ApiParam(value = "The versioned flow details.", required = true) final CreateActiveRequestEntity requestEntity) {
if (isReplicateRequest()) {
- return replicate(HttpMethod.POST);
+ return replicate(HttpMethod.POST, requestEntity);
}
if (requestEntity.getProcessGroupId() == null) {
@@ -548,11 +549,14 @@ public class VersionsResource extends ApplicationResource {
final CreateActiveRequestEntity activeRequestEntity = new CreateActiveRequestEntity();
activeRequestEntity.setProcessGroupId(groupId);
+ final Map<String, String> headers = new HashMap<>();
+ headers.put("content-type", MediaType.APPLICATION_JSON);
+
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
- clusterResponse = getRequestReplicator().replicate(HttpMethod.POST, createRequestUri, activeRequestEntity, Collections.emptyMap()).awaitMergedResponse();
+ clusterResponse = getRequestReplicator().replicate(HttpMethod.POST, createRequestUri, activeRequestEntity, headers).awaitMergedResponse();
} else {
clusterResponse = getRequestReplicator().forwardToCoordinator(
- getClusterCoordinatorNode(), HttpMethod.POST, createRequestUri, activeRequestEntity, Collections.emptyMap()).awaitMergedResponse();
+ getClusterCoordinatorNode(), HttpMethod.POST, createRequestUri, activeRequestEntity, headers).awaitMergedResponse();
}
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
@@ -761,18 +765,20 @@ public class VersionsResource extends ApplicationResource {
final VersionControlInformationDTO versionControlInfoDto = new VersionControlInformationDTO();
versionControlInfoDto.setBucketId(snapshotMetadata.getBucketIdentifier());
versionControlInfoDto.setBucketName(bucket.getName());
- versionControlInfoDto.setCurrent(snapshotMetadata.getVersion() == flow.getVersionCount());
versionControlInfoDto.setFlowId(snapshotMetadata.getFlowIdentifier());
versionControlInfoDto.setFlowName(flow.getName());
versionControlInfoDto.setFlowDescription(flow.getDescription());
versionControlInfoDto.setGroupId(groupId);
- versionControlInfoDto.setModified(false);
versionControlInfoDto.setVersion(snapshotMetadata.getVersion());
versionControlInfoDto.setRegistryId(entity.getRegistryId());
versionControlInfoDto.setRegistryName(serviceFacade.getFlowRegistryName(entity.getRegistryId()));
- final ProcessGroupEntity updatedGroup = serviceFacade.updateProcessGroup(rev, groupId, versionControlInfoDto, flowSnapshot, getIdGenerationSeed().orElse(null), false,
- entity.getUpdateDescendantVersionedFlows());
+ final VersionedFlowState flowState = snapshotMetadata.getVersion() == flow.getVersionCount() ? VersionedFlowState.UP_TO_DATE : VersionedFlowState.STALE;
+ versionControlInfoDto.setState(flowState.name());
+
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ final ProcessGroupEntity updatedGroup = serviceFacade.updateProcessGroupContents(user, rev, groupId, versionControlInfoDto, flowSnapshot, getIdGenerationSeed().orElse(null), false,
+ true, entity.getUpdateDescendantVersionedFlows());
final VersionControlInformationDTO updatedVci = updatedGroup.getComponent().getVersionControlInformation();
final VersionControlInformationEntity responseEntity = new VersionControlInformationEntity();
@@ -1103,7 +1109,7 @@ public class VersionsResource extends ApplicationResource {
affectedComponents, user, replicateRequest, processGroupEntity, flowSnapshot, request, idGenerationSeed, true, true);
vcur.markComplete(updatedVersionControlEntity);
- } catch (final LifecycleManagementException e) {
+ } catch (final Exception e) {
logger.error("Failed to update flow to new version", e);
vcur.setFailureReason("Failed to update flow to new version due to " + e);
}
@@ -1268,7 +1274,7 @@ public class VersionsResource extends ApplicationResource {
affectedComponents, user, replicateRequest, processGroupEntity, flowSnapshot, request, idGenerationSeed, false, true);
vcur.markComplete(updatedVersionControlEntity);
- } catch (final LifecycleManagementException e) {
+ } catch (final Exception e) {
logger.error("Failed to update flow to new version", e);
vcur.setFailureReason("Failed to update flow to new version due to " + e.getMessage());
}
@@ -1403,15 +1409,14 @@ public class VersionsResource extends ApplicationResource {
final VersionControlInformationDTO vci = new VersionControlInformationDTO();
vci.setBucketId(metadata.getBucketIdentifier());
vci.setBucketName(bucket.getName());
- vci.setCurrent(flowSnapshot.isLatest());
vci.setFlowDescription(flow.getDescription());
vci.setFlowId(flow.getIdentifier());
vci.setFlowName(flow.getName());
vci.setGroupId(groupId);
- vci.setModified(false);
vci.setRegistryId(requestVci.getRegistryId());
vci.setRegistryName(serviceFacade.getFlowRegistryName(requestVci.getRegistryId()));
vci.setVersion(metadata.getVersion());
+ vci.setState(flowSnapshot.isLatest() ? VersionedFlowState.UP_TO_DATE.name() : VersionedFlowState.STALE.name());
serviceFacade.updateProcessGroupContents(user, revision, groupId, vci, flowSnapshot, idGenerationSeed, verifyNotModified, false, updateDescendantVersionedFlows);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fe8b30bf/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index ca781a4..5bdb040 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -16,6 +16,8 @@
*/
package org.apache.nifi.web.api.dto;
+import javax.ws.rs.WebApplicationException;
+
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action;
@@ -114,7 +116,6 @@ import org.apache.nifi.provenance.lineage.LineageNode;
import org.apache.nifi.provenance.lineage.ProvenanceEventLineageNode;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistry;
-import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedComponent;
import org.apache.nifi.registry.flow.VersionedFlowState;
@@ -140,7 +141,6 @@ import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.FlowModification;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.dto.action.ActionDTO;
@@ -192,7 +192,6 @@ import org.apache.nifi.web.api.entity.VariableEntity;
import org.apache.nifi.web.controller.ControllerFacade;
import org.apache.nifi.web.revision.RevisionManager;
-import javax.ws.rs.WebApplicationException;
import java.text.Collator;
import java.util.ArrayList;
import java.util.Arrays;
@@ -215,7 +214,6 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
-import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -234,8 +232,6 @@ public final class DtoFactory {
private ControllerServiceProvider controllerServiceProvider;
private EntityFactory entityFactory;
private Authorizer authorizer;
- private NiFiProperties properties;
- private FlowRegistryClient flowRegistryClient;
public ControllerConfigurationDTO createControllerConfigurationDto(final ControllerFacade controllerFacade) {
final ControllerConfigurationDTO dto = new ControllerConfigurationDTO();
@@ -2190,23 +2186,17 @@ public final class DtoFactory {
public Set<ComponentDifferenceDTO> createComponentDifferenceDtos(final FlowComparison comparison) {
- return createComponentDifferenceDtos(comparison, null);
- }
-
- public Set<ComponentDifferenceDTO> createComponentDifferenceDtos(final FlowComparison comparison, final Predicate<FlowDifference> filter) {
final Map<ComponentDifferenceDTO, List<DifferenceDTO>> differencesByComponent = new HashMap<>();
for (final FlowDifference difference : comparison.getDifferences()) {
- if (filter == null || filter.test(difference)) {
- final ComponentDifferenceDTO componentDiff = createComponentDifference(difference);
- final List<DifferenceDTO> differences = differencesByComponent.computeIfAbsent(componentDiff, key -> new ArrayList<>());
+ final ComponentDifferenceDTO componentDiff = createComponentDifference(difference);
+ final List<DifferenceDTO> differences = differencesByComponent.computeIfAbsent(componentDiff, key -> new ArrayList<>());
- final DifferenceDTO dto = new DifferenceDTO();
- dto.setDifferenceType(difference.getDifferenceType().getDescription());
- dto.setDifference(difference.getDescription());
+ final DifferenceDTO dto = new DifferenceDTO();
+ dto.setDifferenceType(difference.getDifferenceType().getDescription());
+ dto.setDifference(difference.getDescription());
- differences.add(dto);
- }
+ differences.add(dto);
}
for (final Map.Entry<ComponentDifferenceDTO, List<DifferenceDTO>> entry : differencesByComponent.entrySet()) {
@@ -2259,8 +2249,6 @@ public final class DtoFactory {
dto.setFlowName(versionControlInfo.getFlowName());
dto.setFlowDescription(versionControlInfo.getFlowDescription());
dto.setVersion(versionControlInfo.getVersion());
- dto.setCurrent(versionControlInfo.isCurrent());
- dto.setModified(versionControlInfo.isModified());
final VersionedFlowStatus status = versionControlInfo.getStatus();
final VersionedFlowState state = status.getState();
@@ -3501,8 +3489,6 @@ public final class DtoFactory {
copy.setFlowName(original.getFlowName());
copy.setFlowDescription(original.getFlowDescription());
copy.setVersion(original.getVersion());
- copy.setCurrent(original.getCurrent());
- copy.setModified(original.getModified());
copy.setState(original.getState());
copy.setStateExplanation(original.getStateExplanation());
return copy;
@@ -3833,12 +3819,4 @@ public final class DtoFactory {
public void setBulletinRepository(BulletinRepository bulletinRepository) {
this.bulletinRepository = bulletinRepository;
}
-
- public void setProperties(final NiFiProperties properties) {
- this.properties = properties;
- }
-
- public void setFlowRegistryClient(FlowRegistryClient flowRegistryClient) {
- this.flowRegistryClient = flowRegistryClient;
- }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fe8b30bf/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
index d25f294..52a18dc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
@@ -26,6 +26,7 @@ import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.StandardVersionControlInformation;
import org.apache.nifi.registry.flow.VersionControlInformation;
@@ -234,6 +235,10 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
}
if (isNotNull(processGroupDTO.getPosition())) {
group.setPosition(new Position(processGroupDTO.getPosition().getX(), processGroupDTO.getPosition().getY()));
+ final ProcessGroup parent = group.getParent();
+ if (parent != null) {
+ parent.onComponentModified();
+ }
}
if (isNotNull(comments)) {
group.setComments(comments);
@@ -258,8 +263,6 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
final StandardVersionControlInformation vci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation)
.registryName(registryName)
.flowSnapshot(flowSnapshot)
- .modified(false)
- .current(true)
.build();
group.setVersionControlInformation(vci, versionedComponentMapping);
@@ -281,6 +284,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
final ProcessGroup group = locateProcessGroup(flowController, groupId);
group.updateFlow(proposedSnapshot, componentIdSeed, verifyNotModified, updateSettings, updateDescendantVersionedFlows);
+ group.findAllRemoteProcessGroups().stream().forEach(RemoteProcessGroup::initialize);
final StandardVersionControlInformation svci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation)
.flowSnapshot(proposedSnapshot.getFlowContents())