You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/01/08 18:14:05 UTC
[17/50] nifi git commit: NIFI-4436: Bug fixes - Checkpoint before
allowing multiple Process Groups with same Versioned Component ID and same
parent - Ensure that if flow update is cancelled while processors are being
stopped/services disabled that we sto
http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
index 3684f04..f2a207e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
@@ -17,21 +17,6 @@
package org.apache.nifi.web.api;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
@@ -92,10 +77,24 @@ import org.apache.nifi.web.util.AffectedComponentUtils;
import org.apache.nifi.web.util.CancellableTimedPause;
import org.apache.nifi.web.util.ComponentLifecycle;
import org.apache.nifi.web.util.LifecycleManagementException;
-import org.apache.nifi.web.util.Pause;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
@@ -454,51 +453,15 @@ public class VersionsResource extends ApplicationResource {
super.authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true, false, true, true);
},
() -> {
- final VersionControlInformationEntity entity = serviceFacade.getVersionControlInformation(groupId);
- if (entity != null) {
- final String flowId = requestEntity.getVersionedFlow().getFlowId();
- if (flowId != null && flowId.equals(entity.getVersionControlInformation().getFlowId())) {
- // Flow ID is the same. We want to publish the Process Group as the next version of the Flow.
- // In order to do this, we have to ensure that the Process Group is 'current'.
- final Boolean current = entity.getVersionControlInformation().getCurrent();
- if (current == null) {
- throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + groupId
- + " because it is not yet known whether or not this Process Group is the most recent version of the flow. "
- + "Please try the request again after the Process Group has been synchronized with the Flow Registry.");
- }
-
- if (current == Boolean.FALSE) {
- throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + groupId
- + " because the Process Group in the flow is not synchronized with the most recent version of the Flow in the Flow Registry. "
- + "In order to publish a new version of the Flow, the Process Group must first be in synch with the latest version in the Flow Registry.");
- }
-
- // Flow ID matches. We want to publish the Process Group as the next version of the Flow, so we must
- // ensure that all other parameters match as well.
- if (!requestEntity.getVersionedFlow().getBucketId().equals(entity.getVersionControlInformation().getBucketId())) {
- throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + groupId
- + " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request.");
- }
-
- if (!requestEntity.getVersionedFlow().getRegistryId().equals(entity.getVersionControlInformation().getRegistryId())) {
- throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + groupId
- + " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request.");
- }
-
- } else if (flowId != null) {
- // Flow ID is specified but different. This is not allowed, because Flow ID's are automatically generated,
- // and if the client is specifying an ID then it is either trying to assign the ID of the Flow or it is
- // attempting to save a new version of a different flow. Saving a new version of a different Flow is
- // not allowed because the Process Group must be in synch with the latest version of the flow before that
- // can be done.
- throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + groupId
- + " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request.");
- }
- }
+ final VersionedFlowDTO versionedFlow = requestEntity.getVersionedFlow();
+ final String registryId = versionedFlow.getRegistryId();
+ final String bucketId = versionedFlow.getBucketId();
+ final String flowId = versionedFlow.getFlowId();
+ serviceFacade.verifyCanSaveToFlowRegistry(groupId, registryId, bucketId, flowId);
},
(rev, flowEntity) -> {
// Register the current flow with the Flow Registry.
- final VersionControlComponentMappingEntity mappingEntity = serviceFacade.registerFlowWithFlowRegistry(groupId, requestEntity);
+ final VersionControlComponentMappingEntity mappingEntity = serviceFacade.registerFlowWithFlowRegistry(groupId, flowEntity);
// Update the Process Group's Version Control Information
final VersionControlInformationEntity responseEntity = serviceFacade.setVersionControlInformation(rev, groupId,
@@ -756,7 +719,8 @@ public class VersionsResource extends ApplicationResource {
versionControlInfoDto.setRegistryId(requestEntity.getRegistryId());
versionControlInfoDto.setRegistryName(serviceFacade.getFlowRegistryName(requestEntity.getRegistryId()));
- final ProcessGroupEntity updatedGroup = serviceFacade.updateProcessGroup(rev, groupId, versionControlInfoDto, flowSnapshot, getIdGenerationSeed().orElse(null), false);
+ final ProcessGroupEntity updatedGroup = serviceFacade.updateProcessGroup(rev, groupId, versionControlInfoDto, flowSnapshot, getIdGenerationSeed().orElse(null), false,
+ entity.getUpdateDescendantVersionedFlows());
final VersionControlInformationDTO updatedVci = updatedGroup.getComponent().getVersionControlInformation();
final VersionControlInformationEntity responseEntity = new VersionControlInformationEntity();
@@ -1039,7 +1003,7 @@ public class VersionsResource extends ApplicationResource {
// 14. Re-Start all Processors, Funnels, Ports that are affected and not removed.
// Step 0: Get the Versioned Flow Snapshot from the Flow Registry
- final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation());
+ final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation(), true);
// The flow in the registry may not contain the same versions of components that we have in our flow. As a result, we need to update
// the flow snapshot to contain compatible bundles.
@@ -1085,7 +1049,7 @@ public class VersionsResource extends ApplicationResource {
final Consumer<AsynchronousWebRequest<VersionControlInformationEntity>> updateTask = vcur -> {
try {
final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, componentLifecycle, exampleUri,
- affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, true);
+ affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, true, true);
vcur.markComplete(updatedVersionControlEntity);
} catch (final LifecycleManagementException e) {
@@ -1188,7 +1152,7 @@ public class VersionsResource extends ApplicationResource {
final String idGenerationSeed = getIdGenerationSeed().orElse(null);
// Step 0: Get the Versioned Flow Snapshot from the Flow Registry
- final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation());
+ final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation(), false);
// The flow in the registry may not contain the same versions of components that we have in our flow. As a result, we need to update
// the flow snapshot to contain compatible bundles.
@@ -1221,8 +1185,7 @@ public class VersionsResource extends ApplicationResource {
() -> {
// Step 3: Verify that all components in the snapshot exist on all nodes
// Step 4: Verify that Process Group is already under version control. If not, must start Version Control instead of updating flow
- // Step 5: Verify that Process Group is not 'dirty'
- serviceFacade.verifyCanUpdate(groupId, flowSnapshot, false, false);
+ serviceFacade.verifyCanRevertLocalModifications(groupId, flowSnapshot);
},
(revision, processGroupEntity) -> {
// Ensure that the information passed in is correct
@@ -1254,7 +1217,7 @@ public class VersionsResource extends ApplicationResource {
final Consumer<AsynchronousWebRequest<VersionControlInformationEntity>> updateTask = vcur -> {
try {
final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, componentLifecycle, exampleUri,
- affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, false);
+ affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, false, false);
vcur.markComplete(updatedVersionControlEntity);
} catch (final LifecycleManagementException e) {
@@ -1288,7 +1251,7 @@ public class VersionsResource extends ApplicationResource {
private VersionControlInformationEntity updateFlowVersion(final String groupId, final ComponentLifecycle componentLifecycle, final URI exampleUri,
final Set<AffectedComponentEntity> affectedComponents, final NiFiUser user, final boolean replicateRequest, final VersionControlInformationEntity requestEntity,
final VersionedFlowSnapshot flowSnapshot, final AsynchronousWebRequest<VersionControlInformationEntity> asyncRequest, final String idGenerationSeed,
- final boolean verifyNotModified) throws LifecycleManagementException {
+ final boolean verifyNotModified, final boolean updateDescendantVersionedFlows) throws LifecycleManagementException {
// Steps 6-7: Determine which components must be stopped and stop them.
final Set<String> stoppableReferenceTypes = new HashSet<>();
@@ -1302,7 +1265,8 @@ public class VersionsResource extends ApplicationResource {
.collect(Collectors.toSet());
logger.info("Stopping {} Processors", runningComponents.size());
- final Pause stopComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ final CancellableTimedPause stopComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ asyncRequest.setCancelCallback(stopComponentsPause::cancel);
componentLifecycle.scheduleComponents(exampleUri, user, groupId, runningComponents, ScheduledState.STOPPED, stopComponentsPause);
if (asyncRequest.isCancelled()) {
@@ -1317,7 +1281,8 @@ public class VersionsResource extends ApplicationResource {
.collect(Collectors.toSet());
logger.info("Disabling {} Controller Services", enabledServices.size());
- final Pause disableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ final CancellableTimedPause disableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ asyncRequest.setCancelCallback(disableServicesPause::cancel);
componentLifecycle.activateControllerServices(exampleUri, user, groupId, enabledServices, ControllerServiceState.DISABLED, disableServicesPause);
if (asyncRequest.isCancelled()) {
@@ -1328,96 +1293,113 @@ public class VersionsResource extends ApplicationResource {
logger.info("Updating Process Group with ID {} to version {} of the Versioned Flow", groupId, flowSnapshot.getSnapshotMetadata().getVersion());
// If replicating request, steps 10-12 are performed on each node individually, and this is accomplished
// by replicating a PUT to /nifi-api/versions/process-groups/{groupId}
- if (replicateRequest) {
+ try {
+ if (replicateRequest) {
- final URI updateUri;
- try {
- updateUri = new URI(exampleUri.getScheme(), exampleUri.getUserInfo(), exampleUri.getHost(),
- exampleUri.getPort(), "/nifi-api/versions/process-groups/" + groupId, null, exampleUri.getFragment());
- } catch (URISyntaxException e) {
- throw new RuntimeException(e);
- }
+ final URI updateUri;
+ try {
+ updateUri = new URI(exampleUri.getScheme(), exampleUri.getUserInfo(), exampleUri.getHost(),
+ exampleUri.getPort(), "/nifi-api/versions/process-groups/" + groupId, null, exampleUri.getFragment());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
- final Map<String, String> headers = new HashMap<>();
- headers.put("content-type", MediaType.APPLICATION_JSON);
+ final Map<String, String> headers = new HashMap<>();
+ headers.put("content-type", MediaType.APPLICATION_JSON);
- final VersionedFlowSnapshotEntity snapshotEntity = new VersionedFlowSnapshotEntity();
- snapshotEntity.setProcessGroupRevision(requestEntity.getProcessGroupRevision());
- snapshotEntity.setRegistryId(requestEntity.getVersionControlInformation().getRegistryId());
- snapshotEntity.setVersionedFlow(flowSnapshot);
+ final VersionedFlowSnapshotEntity snapshotEntity = new VersionedFlowSnapshotEntity();
+ snapshotEntity.setProcessGroupRevision(requestEntity.getProcessGroupRevision());
+ snapshotEntity.setRegistryId(requestEntity.getVersionControlInformation().getRegistryId());
+ snapshotEntity.setVersionedFlow(flowSnapshot);
+ snapshotEntity.setUpdateDescendantVersionedFlows(updateDescendantVersionedFlows);
- final NodeResponse clusterResponse;
- try {
- if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
- clusterResponse = getRequestReplicator().replicate(user, HttpMethod.PUT, updateUri, snapshotEntity, headers).awaitMergedResponse();
- } else {
- clusterResponse = getRequestReplicator().forwardToCoordinator(
- getClusterCoordinatorNode(), user, HttpMethod.PUT, updateUri, snapshotEntity, headers).awaitMergedResponse();
+ final NodeResponse clusterResponse;
+ try {
+ logger.debug("Replicating PUT request to {} for user {}", updateUri, user);
+
+ if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+ clusterResponse = getRequestReplicator().replicate(user, HttpMethod.PUT, updateUri, snapshotEntity, headers).awaitMergedResponse();
+ } else {
+ clusterResponse = getRequestReplicator().forwardToCoordinator(
+ getClusterCoordinatorNode(), user, HttpMethod.PUT, updateUri, snapshotEntity, headers).awaitMergedResponse();
+ }
+ } catch (final InterruptedException ie) {
+ logger.warn("Interrupted while replicating PUT request to {} for user {}", updateUri, user);
+ Thread.currentThread().interrupt();
+ throw new LifecycleManagementException("Interrupted while updating flows across cluster", ie);
}
- } catch (final InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new LifecycleManagementException("Interrupted while updating flows across cluster", ie);
- }
- final int disableServicesStatus = clusterResponse.getStatus();
- if (disableServicesStatus != Status.OK.getStatusCode()) {
- final String explanation = getResponseEntity(clusterResponse, String.class);
- throw new LifecycleManagementException("Failed to update Flow on all nodes in cluster due to " + explanation);
+ final int updateFlowStatus = clusterResponse.getStatus();
+ if (updateFlowStatus != Status.OK.getStatusCode()) {
+ final String explanation = getResponseEntity(clusterResponse, String.class);
+ logger.error("Failed to update flow across cluster when replicating PUT request to {} for user {}. Received {} response with explanation: {}",
+ updateUri, user, updateFlowStatus, explanation);
+ throw new LifecycleManagementException("Failed to update Flow on all nodes in cluster due to " + explanation);
+ }
+
+ } else {
+ // Step 10: Ensure that if any connection exists in the flow and does not exist in the proposed snapshot,
+ // that it has no data in it. Ensure that no Input Port was removed, unless it currently has no incoming connections.
+ // Ensure that no Output Port was removed, unless it currently has no outgoing connections.
+ serviceFacade.verifyCanUpdate(groupId, flowSnapshot, true, verifyNotModified);
+
+ // Step 11-12. Update Process Group to the new flow and update variable registry with any Variables that were added or removed
+ final RevisionDTO revisionDto = requestEntity.getProcessGroupRevision();
+ final Revision revision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), groupId);
+ final VersionControlInformationDTO requestVci = requestEntity.getVersionControlInformation();
+
+ final Bucket bucket = flowSnapshot.getBucket();
+ final VersionedFlow flow = flowSnapshot.getFlow();
+
+ final VersionedFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata();
+ final VersionControlInformationDTO vci = new VersionControlInformationDTO();
+ vci.setBucketId(metadata.getBucketIdentifier());
+ vci.setBucketName(bucket.getName());
+ vci.setCurrent(flowSnapshot.isLatest());
+ vci.setFlowDescription(flow.getDescription());
+ vci.setFlowId(flow.getIdentifier());
+ vci.setFlowName(flow.getName());
+ vci.setGroupId(groupId);
+ vci.setModified(false);
+ vci.setRegistryId(requestVci.getRegistryId());
+ vci.setRegistryName(serviceFacade.getFlowRegistryName(requestVci.getRegistryId()));
+ vci.setVersion(metadata.getVersion());
+
+ serviceFacade.updateProcessGroupContents(user, revision, groupId, vci, flowSnapshot, idGenerationSeed, verifyNotModified, false, updateDescendantVersionedFlows);
}
+ } finally {
+ if (!asyncRequest.isCancelled()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Re-Enabling {} Controller Services: {}", enabledServices.size(), enabledServices);
+ }
- } else {
- // Step 10: Ensure that if any connection exists in the flow and does not exist in the proposed snapshot,
- // that it has no data in it. Ensure that no Input Port was removed, unless it currently has no incoming connections.
- // Ensure that no Output Port was removed, unless it currently has no outgoing connections.
- serviceFacade.verifyCanUpdate(groupId, flowSnapshot, true, verifyNotModified);
-
- // Step 11-12. Update Process Group to the new flow and update variable registry with any Variables that were added or removed
- final RevisionDTO revisionDto = requestEntity.getProcessGroupRevision();
- final Revision revision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), groupId);
- final VersionControlInformationDTO requestVci = requestEntity.getVersionControlInformation();
-
- final Bucket bucket = flowSnapshot.getBucket();
- final VersionedFlow flow = flowSnapshot.getFlow();
-
- final VersionedFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata();
- final VersionControlInformationDTO vci = new VersionControlInformationDTO();
- vci.setBucketId(metadata.getBucketIdentifier());
- vci.setBucketName(bucket.getName());
- vci.setCurrent(flowSnapshot.isLatest());
- vci.setFlowDescription(flow.getDescription());
- vci.setFlowId(flow.getIdentifier());
- vci.setFlowName(flow.getName());
- vci.setGroupId(groupId);
- vci.setModified(false);
- vci.setRegistryId(requestVci.getRegistryId());
- vci.setRegistryName(serviceFacade.getFlowRegistryName(requestVci.getRegistryId()));
- vci.setVersion(metadata.getVersion());
-
- serviceFacade.updateProcessGroupContents(user, revision, groupId, vci, flowSnapshot, idGenerationSeed, verifyNotModified, false);
- }
+ asyncRequest.update(new Date(), "Re-Enabling Controller Services", 60);
- if (asyncRequest.isCancelled()) {
- return null;
- }
- asyncRequest.update(new Date(), "Re-Enabling Controller Services", 60);
+ // Step 13. Re-enable all disabled controller services
+ final CancellableTimedPause enableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ asyncRequest.setCancelCallback(enableServicesPause::cancel);
+ final Set<AffectedComponentEntity> servicesToEnable = getUpdatedEntities(enabledServices, user);
+ logger.info("Successfully updated flow; re-enabling {} Controller Services", servicesToEnable.size());
+ componentLifecycle.activateControllerServices(exampleUri, user, groupId, servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause);
+ }
- // Step 13. Re-enable all disabled controller services
- final Pause enableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
- final Set<AffectedComponentEntity> servicesToEnable = getUpdatedEntities(enabledServices, user);
- logger.info("Successfully updated flow; re-enabling {} Controller Services", servicesToEnable.size());
- componentLifecycle.activateControllerServices(exampleUri, user, groupId, servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause);
+ if (!asyncRequest.isCancelled()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Restart {} Processors: {}", runningComponents.size(), runningComponents);
+ }
- if (asyncRequest.isCancelled()) {
- return null;
- }
- asyncRequest.update(new Date(), "Restarting Processors", 80);
+ asyncRequest.update(new Date(), "Restarting Processors", 80);
- // Step 14. Restart all components
- final Set<AffectedComponentEntity> componentsToStart = getUpdatedEntities(runningComponents, user);
- final Pause startComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
- logger.info("Restarting {} Processors", componentsToStart.size());
- componentLifecycle.scheduleComponents(exampleUri, user, groupId, componentsToStart, ScheduledState.RUNNING, startComponentsPause);
+ // Step 14. Restart all components
+ final Set<AffectedComponentEntity> componentsToStart = getUpdatedEntities(runningComponents, user);
+ final CancellableTimedPause startComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ asyncRequest.setCancelCallback(startComponentsPause::cancel);
+ logger.info("Restarting {} Processors", componentsToStart.size());
+ componentLifecycle.scheduleComponents(exampleUri, user, groupId, componentsToStart, ScheduledState.RUNNING, startComponentsPause);
+ }
+ }
+ asyncRequest.setCancelCallback(null);
if (asyncRequest.isCancelled()) {
return null;
}
@@ -1426,6 +1408,7 @@ public class VersionsResource extends ApplicationResource {
return serviceFacade.getVersionControlInformation(groupId);
}
+
/**
* Extracts the response entity from the specified node response.
*
http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java
index 1309eee..3cecdeb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java
@@ -99,4 +99,12 @@ public interface AsynchronousWebRequest<T> {
* @return <code>true</code> if the request has been canceled, <code>false</code> otherwise
*/
boolean isCancelled();
+
+ /**
+ * Sets the cancel callback to the given runnable, so that if {@link #cancel()} is called, the given {@link Runnable} will be triggered.
+ * If <code>null</code> is passed, no operation will be triggered when the task is cancelled.
+ *
+ * @param runnable the callback
+ */
+ void setCancelCallback(Runnable runnable);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java
index 4810a32..8e2e221 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java
@@ -34,6 +34,7 @@ public class StandardAsynchronousWebRequest<T> implements AsynchronousWebRequest
private volatile String failureReason;
private volatile boolean cancelled;
private volatile T results;
+ private volatile Runnable cancelCallback;
public StandardAsynchronousWebRequest(final String requestId, final String processGroupId, final NiFiUser user, final String state) {
this.id = requestId;
@@ -57,6 +58,11 @@ public class StandardAsynchronousWebRequest<T> implements AsynchronousWebRequest
}
@Override
+ public void setCancelCallback(final Runnable runnable) {
+ this.cancelCallback = runnable;
+ }
+
+ @Override
public void markComplete(final T results) {
this.complete = true;
this.results = results;
@@ -130,6 +136,7 @@ public class StandardAsynchronousWebRequest<T> implements AsynchronousWebRequest
percentComplete = 100;
state = "Canceled by user";
setFailureReason("Request cancelled by user");
+ cancelCallback.run();
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 6077268..7d40473 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -118,6 +118,7 @@ import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedComponent;
import org.apache.nifi.registry.flow.diff.FlowComparison;
import org.apache.nifi.registry.flow.diff.FlowDifference;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedConnection;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedFunnel;
@@ -2202,15 +2203,23 @@ public final class DtoFactory {
private ComponentDifferenceDTO createComponentDifference(final FlowDifference difference) {
VersionedComponent component = difference.getComponentA();
- if (component == null) {
+ if (component == null || difference.getComponentB() instanceof InstantiatedVersionedComponent) {
component = difference.getComponentB();
}
final ComponentDifferenceDTO dto = new ComponentDifferenceDTO();
- dto.setComponentId(component.getIdentifier());
dto.setComponentName(component.getName());
dto.setComponentType(component.getComponentType().name());
- dto.setProcessGroupId(dto.getProcessGroupId());
+
+ if (component instanceof InstantiatedVersionedComponent) {
+ final InstantiatedVersionedComponent instantiatedComponent = (InstantiatedVersionedComponent) component;
+ dto.setComponentId(instantiatedComponent.getInstanceId());
+ dto.setProcessGroupId(instantiatedComponent.getInstanceGroupId());
+ } else {
+ dto.setComponentId(component.getIdentifier());
+ dto.setProcessGroupId(dto.getProcessGroupId());
+ }
+
return dto;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
index 7cf61ea..9259bf4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
@@ -114,10 +114,12 @@ public interface ProcessGroupDAO {
* @param versionControlInformation the new Version Control Information
* @param componentIdSeed the seed value to use for generating ID's for new components
* @param updateSettings whether or not to update the process group's name and position
+ * @param updateDescendantVersionedFlows if a child/descendant Process Group is under Version Control, specifies whether or not to
+ * update the contents of that Process Group
* @return the process group
*/
ProcessGroup updateProcessGroupFlow(String groupId, VersionedFlowSnapshot proposedSnapshot, VersionControlInformationDTO versionControlInformation, String componentIdSeed,
- boolean verifyNotModified, boolean updateSettings);
+ boolean verifyNotModified, boolean updateSettings, boolean updateDescendantVersionedFlows);
/**
* Applies the given Version Control Information to the Process Group
http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
index e3c4725..bb7edb1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
@@ -29,6 +29,8 @@ import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.StandardVersionControlInformation;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
@@ -244,8 +246,12 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
final FlowRegistry flowRegistry = flowController.getFlowRegistryClient().getFlowRegistry(registryId);
final String registryName = flowRegistry == null ? registryId : flowRegistry.getName();
+ final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
+ final VersionedProcessGroup flowSnapshot = mapper.mapProcessGroup(group, flowController.getFlowRegistryClient(), false);
+
final StandardVersionControlInformation vci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation)
.registryName(registryName)
+ .flowSnapshot(flowSnapshot)
.modified(false)
.current(true)
.build();
@@ -264,9 +270,9 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
@Override
public ProcessGroup updateProcessGroupFlow(final String groupId, final VersionedFlowSnapshot proposedSnapshot, final VersionControlInformationDTO versionControlInformation,
- final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings) {
+ final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings, final boolean updateDescendantVersionedFlows) {
final ProcessGroup group = locateProcessGroup(flowController, groupId);
- group.updateFlow(proposedSnapshot, componentIdSeed, verifyNotModified, updateSettings);
+ group.updateFlow(proposedSnapshot, componentIdSeed, verifyNotModified, updateSettings, updateDescendantVersionedFlows);
final StandardVersionControlInformation svci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation)
.flowSnapshot(proposedSnapshot.getFlowContents())
http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java
index 1f83a6f..dea43f6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java
@@ -43,7 +43,7 @@ public class CancellableTimedPause implements Pause {
long sysTime = System.nanoTime();
final long maxWaitTime = System.nanoTime() + pauseNanos;
- while (sysTime < maxWaitTime) {
+ while (sysTime < maxWaitTime && !cancelled) {
try {
TimeUnit.NANOSECONDS.sleep(pauseNanos);
} catch (final InterruptedException ie) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/adacb204/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java
index 4469ea1..11bd1b8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java
@@ -421,18 +421,24 @@ public final class SnippetUtils {
}
// get a list of all names of process groups so that we can rename as needed.
- final List<String> groupNames = new ArrayList<>();
+ final Set<String> groupNames = new HashSet<>();
for (final ProcessGroup childGroup : group.getProcessGroups()) {
groupNames.add(childGroup.getName());
}
if (snippetContents.getProcessGroups() != null) {
for (final ProcessGroupDTO groupDTO : snippetContents.getProcessGroups()) {
- String groupName = groupDTO.getName();
- while (groupNames.contains(groupName)) {
- groupName = "Copy of " + groupName;
+ // If Version Control Information is present, then we don't want to rename the
+ // Process Group - we want it to remain the same as the one in Version Control.
+ // However, in order to disambiguate things, we generally do want to rename to
+ // 'Copy of...' so we do this only if there is no Version Control Information present.
+ if (groupDTO.getVersionControlInformation() == null) {
+ String groupName = groupDTO.getName();
+ while (groupNames.contains(groupName)) {
+ groupName = "Copy of " + groupName;
+ }
+ groupDTO.setName(groupName);
}
- groupDTO.setName(groupName);
groupNames.add(groupDTO.getName());
}
}