You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/01/08 18:13:58 UTC
[10/50] nifi git commit: NIFI-4436: - Initial checkpoint: able ot
start version control and detect changes, in standalone mode,
still 'crude' implementation - Checkpoint: Can place flow under version
control and can determine if modified - Checkpoint: Ch
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java
new file mode 100644
index 0000000..4b87b50
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.api.concurrent;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.web.ResourceNotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AsyncRequestManager<T> implements RequestManager<T> {
+ private static final Logger logger = LoggerFactory.getLogger(AsyncRequestManager.class);
+
+ private final long requestExpirationMillis;
+ private final int maxConcurrentRequests;
+ private final ConcurrentMap<String, AsynchronousWebRequest<T>> requests = new ConcurrentHashMap<>();
+
+ private final ExecutorService threadPool;
+
+
+ public AsyncRequestManager(final int maxConcurrentRequests, final long requestExpirationMillis, final String threadNamePrefix) {
+ this.requestExpirationMillis = requestExpirationMillis;
+ this.maxConcurrentRequests = maxConcurrentRequests;
+
+ this.threadPool = new ThreadPoolExecutor(1, 50, 5L, TimeUnit.SECONDS,
+ new ArrayBlockingQueue<Runnable>(maxConcurrentRequests),
+ new ThreadFactory() {
+ private final AtomicLong counter = new AtomicLong(0L);
+
+ @Override
+ public Thread newThread(final Runnable r) {
+ final Thread thread = Executors.defaultThreadFactory().newThread(r);
+ thread.setName(threadNamePrefix + "-" + counter.incrementAndGet());
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
+
+ }
+
+ private String getKey(final String type, final String request) {
+ return type + "/" + request;
+ }
+
+ @Override
+ public void submitRequest(final String type, final String requestId, final AsynchronousWebRequest<T> request, final Consumer<AsynchronousWebRequest<T>> task) {
+ Objects.requireNonNull(type);
+ Objects.requireNonNull(requestId);
+ Objects.requireNonNull(request);
+ Objects.requireNonNull(task);
+
+ // before adding to the request map, purge any old requests. Must do this by creating a List of ID's
+ // and then removing those ID's one-at-a-time in order to avoid ConcurrentModificationException.
+ final Date oneMinuteAgo = new Date(System.currentTimeMillis() - requestExpirationMillis);
+ final List<String> completedRequestIds = requests.entrySet().stream()
+ .filter(entry -> entry.getValue().isComplete())
+ .filter(entry -> entry.getValue().getLastUpdated().before(oneMinuteAgo))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+
+ completedRequestIds.stream().forEach(id -> requests.remove(id));
+
+ final int requestCount = requests.size();
+ if (requestCount > maxConcurrentRequests) {
+ throw new IllegalStateException("There are already " + requestCount + " update requests for variable registries. "
+ + "Cannot issue any more requests until the older ones are deleted or expire");
+ }
+
+ final String key = getKey(type, requestId);
+ final AsynchronousWebRequest<T> existing = this.requests.putIfAbsent(key, request);
+ if (existing != null) {
+ throw new IllegalArgumentException("A requests already exists with this ID and type");
+ }
+
+ threadPool.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ task.accept(request);
+ } catch (final Exception e) {
+ logger.error("Failed to perform asynchronous task", e);
+ request.setFailureReason("Encountered unexpected error when performing asynchronous task: " + e);
+ request.setLastUpdated(new Date());
+ }
+ }
+ });
+ }
+
+
+ @Override
+ public AsynchronousWebRequest<T> removeRequest(final String type, final String id, final NiFiUser user) {
+ Objects.requireNonNull(type);
+ Objects.requireNonNull(id);
+ Objects.requireNonNull(user);
+
+ final String key = getKey(type, id);
+ final AsynchronousWebRequest<T> request = requests.get(key);
+ if (request == null) {
+ throw new ResourceNotFoundException("Could not find a Request with identifier " + id);
+ }
+
+ if (!request.getUser().equals(user)) {
+ throw new IllegalArgumentException("Only the user that submitted the update request can delete it.");
+ }
+
+ if (!request.isComplete()) {
+ throw new IllegalStateException("Cannot remove the request because it is not yet complete");
+ }
+
+ return requests.remove(key);
+ }
+
+ @Override
+ public AsynchronousWebRequest<T> getRequest(final String type, final String id, final NiFiUser user) {
+ Objects.requireNonNull(type);
+ Objects.requireNonNull(id);
+ Objects.requireNonNull(user);
+
+ final String key = getKey(type, id);
+ final AsynchronousWebRequest<T> request = requests.get(key);
+ if (request == null) {
+ throw new ResourceNotFoundException("Could not find a Request with identifier " + id);
+ }
+
+ if (!request.getUser().equals(user)) {
+ throw new IllegalArgumentException("Only the user that submitted the update request can delete it.");
+ }
+
+ return request;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java
new file mode 100644
index 0000000..d09f895
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.api.concurrent;
+
+import java.util.Date;
+
+import org.apache.nifi.authorization.user.NiFiUser;
+
+public interface AsynchronousWebRequest<T> {
+
+ /**
+ * @return the ID of the process group that the request is for
+ */
+ String getProcessGroupId();
+
+ /**
+ * @return whether or not this request has completed
+ */
+ boolean isComplete();
+
+ /**
+ * @return the Date at which the status of this request was last updated
+ */
+ Date getLastUpdated();
+
+ /**
+ * Updates the Date at which the status of this request was last updated
+ *
+ * @param date the date at which the status of this request was last updated
+ */
+ void setLastUpdated(Date date);
+
+ /**
+ * @return the user who submitted the request
+ */
+ NiFiUser getUser();
+
+ /**
+ * Indicates that this request has completed, successfully or otherwise
+ *
+ * @param results the results of the request
+ */
+ void markComplete(T results);
+
+ /**
+ * Updates the request to indicate the reason that the request failed
+ *
+ * @param explanation the reason that the request failed
+ */
+ void setFailureReason(String explanation);
+
+ /**
+ * Indicates the reason that the request failed, or <code>null</code> if the request has not failed
+ *
+ * @param explanation the reason that the request failed, or <code>null</code> if the request has not failed
+ */
+ String getFailureReason();
+
+ /**
+ * Returns the results of the request, if it completed successfully, or <code>null</code> if the request either has no completed or failed
+ *
+ * @return the results of the request, if it completed successfully, or <code>null</code> if the request either has no completed or failed
+ */
+ T getResults();
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/RequestManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/RequestManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/RequestManager.java
new file mode 100644
index 0000000..580ab47
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/RequestManager.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.api.concurrent;
+
+import java.util.function.Consumer;
+
+import org.apache.nifi.authorization.user.NiFiUser;
+
+public interface RequestManager<T> {
+
+ /**
+ * Submits a request to be performed in the background
+ *
+ * @param requestType the type of request to submit. This value can be anything and is used along with the id in order to create
+ * a composite key for the request so that different request types may easily be managed by the RequestManager.
+ * @param id the ID of the request
+ * @param request the request
+ * @param task the task that should be performed in the background
+ *
+ * @throws IllegalArgumentException if a request already exists with the given ID
+ * @throws NullPointerException if any argument is null
+ */
+ void submitRequest(String requestType, String id, AsynchronousWebRequest<T> request, Consumer<AsynchronousWebRequest<T>> task);
+
+ /**
+ * Retrieves the request with the given ID
+ *
+ * @param requestType the type of the request being retrieved
+ * @param id the ID of the request
+ * @param user the user who is retrieving the request
+ * @return the request with the given ID
+ *
+ * @throws ResourceNotFoundException if no request can be found with the given ID
+ * @throws IllegalArgumentException if the user given is not the user that submitted the request
+ * @throws NullPointerException if either the ID or the user is null
+ */
+ AsynchronousWebRequest<T> getRequest(String requestType, String id, NiFiUser user);
+
+ /**
+ * Removes the request with the given ID
+ *
+ * @param requestType the type of the request being removed
+ * @param id the ID of the request
+ * @param user the user who is retrieving the request
+ * @return the request with the given ID
+ *
+ * @throws ResourceNotFoundException if no request can be found with the given ID
+ * @throws IllegalArgumentException if the user given is not the user that submitted the request
+ * @throws IllegalStateException if the request with the given ID is not yet complete
+ * @throws NullPointerException if either the ID or the user is null
+ */
+ AsynchronousWebRequest<T> removeRequest(String requestType, String id, NiFiUser user);
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java
new file mode 100644
index 0000000..8ba9a58
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.api.concurrent;
+
+import java.util.Date;
+import java.util.Objects;
+
+import org.apache.nifi.authorization.user.NiFiUser;
+
+public class StandardAsynchronousWebRequest<T> implements AsynchronousWebRequest<T> {
+ private final String id;
+ private final String processGroupId;
+ private final NiFiUser user;
+
+ private volatile boolean complete = false;
+ private volatile Date lastUpdated = new Date();
+ private volatile String failureReason;
+ private volatile T results;
+
+ public StandardAsynchronousWebRequest(final String requestId, final String processGroupId, final NiFiUser user) {
+ this.id = requestId;
+ this.processGroupId = processGroupId;
+ this.user = user;
+ }
+
+ public String getRequestId() {
+ return id;
+ }
+
+ @Override
+ public boolean isComplete() {
+ return complete;
+ }
+
+ @Override
+ public String getProcessGroupId() {
+ return processGroupId;
+ }
+
+ @Override
+ public void markComplete(final T results) {
+ this.complete = true;
+ this.results = results;
+ this.lastUpdated = new Date();
+ }
+
+ @Override
+ public Date getLastUpdated() {
+ return lastUpdated;
+ }
+
+ @Override
+ public void setLastUpdated(final Date date) {
+ this.lastUpdated = lastUpdated;
+ }
+
+ @Override
+ public NiFiUser getUser() {
+ return user;
+ }
+
+ @Override
+ public void setFailureReason(final String explanation) {
+ this.failureReason = Objects.requireNonNull(explanation);
+ this.complete = true;
+ this.results = null;
+ }
+
+ @Override
+ public String getFailureReason() {
+ return failureReason;
+ }
+
+ @Override
+ public T getResults() {
+ return results;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 58abdea..489e590 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -16,6 +16,33 @@
*/
package org.apache.nifi.web.api.dto;
+import java.text.Collator;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import javax.ws.rs.WebApplicationException;
+
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action;
@@ -112,6 +139,15 @@ import org.apache.nifi.provenance.lineage.LineageEdge;
import org.apache.nifi.provenance.lineage.LineageNode;
import org.apache.nifi.provenance.lineage.ProvenanceEventLineageNode;
import org.apache.nifi.registry.ComponentVariableRegistry;
+import org.apache.nifi.registry.flow.VersionControlInformation;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedConnection;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedLabel;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedPort;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedRemoteGroupPort;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedRemoteProcessGroup;
import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest;
import org.apache.nifi.registry.variable.VariableRegistryUpdateStep;
import org.apache.nifi.remote.RemoteGroupPort;
@@ -160,42 +196,19 @@ import org.apache.nifi.web.api.entity.AllowableValueEntity;
import org.apache.nifi.web.api.entity.BulletinEntity;
import org.apache.nifi.web.api.entity.ComponentReferenceEntity;
import org.apache.nifi.web.api.entity.ConnectionStatusSnapshotEntity;
+import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.FlowBreadcrumbEntity;
import org.apache.nifi.web.api.entity.PortStatusSnapshotEntity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProcessorStatusSnapshotEntity;
+import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusSnapshotEntity;
import org.apache.nifi.web.api.entity.TenantEntity;
import org.apache.nifi.web.api.entity.VariableEntity;
import org.apache.nifi.web.controller.ControllerFacade;
import org.apache.nifi.web.revision.RevisionManager;
-import javax.ws.rs.WebApplicationException;
-import java.text.Collator;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
public final class DtoFactory {
@SuppressWarnings("rawtypes")
@@ -616,6 +629,7 @@ public final class DtoFactory {
dto.setzIndex(connection.getZIndex());
dto.setSource(createConnectableDto(connection.getSource()));
dto.setDestination(createConnectableDto(connection.getDestination()));
+ dto.setVersionedComponentId(connection.getVersionedComponentId().orElse(null));
dto.setBackPressureObjectThreshold(connection.getFlowFileQueue().getBackPressureObjectThreshold());
dto.setBackPressureDataSizeThreshold(connection.getFlowFileQueue().getBackPressureDataSizeThreshold());
@@ -667,6 +681,7 @@ public final class DtoFactory {
dto.setId(connectable.getIdentifier());
dto.setName(isAuthorized ? connectable.getName() : connectable.getIdentifier());
dto.setType(connectable.getConnectableType().name());
+ dto.setVersionedComponentId(connectable.getVersionedComponentId().orElse(null));
if (connectable instanceof RemoteGroupPort) {
final RemoteGroupPort remoteGroupPort = (RemoteGroupPort) connectable;
@@ -708,6 +723,7 @@ public final class DtoFactory {
dto.setWidth(label.getSize().getWidth());
dto.setLabel(label.getValue());
dto.setParentGroupId(label.getProcessGroup().getIdentifier());
+ dto.setVersionedComponentId(label.getVersionedComponentId().orElse(null));
return dto;
}
@@ -824,6 +840,7 @@ public final class DtoFactory {
dto.setId(funnel.getIdentifier());
dto.setPosition(createPositionDto(funnel.getPosition()));
dto.setParentGroupId(funnel.getProcessGroup().getIdentifier());
+ dto.setVersionedComponentId(funnel.getVersionedComponentId().orElse(null));
return dto;
}
@@ -1228,6 +1245,7 @@ public final class DtoFactory {
dto.setParentGroupId(port.getProcessGroup().getIdentifier());
dto.setState(port.getScheduledState().toString());
dto.setType(port.getConnectableType().name());
+ dto.setVersionedComponentId(port.getVersionedComponentId().orElse(null));
// if this port is on the root group, determine if its actually connected to another nifi
if (port instanceof RootGroupPort) {
@@ -1354,6 +1372,7 @@ public final class DtoFactory {
dto.setDeprecated(controllerServiceNode.isDeprecated());
dto.setExtensionMissing(controllerServiceNode.isExtensionMissing());
dto.setMultipleVersionsAvailable(compatibleBundles.size() > 1);
+ dto.setVersionedComponentId(controllerServiceNode.getVersionedComponentId().orElse(null));
// sort a copy of the properties
final Map<PropertyDescriptor, String> sortedProperties = new TreeMap<>(new Comparator<PropertyDescriptor>() {
@@ -1511,6 +1530,7 @@ public final class DtoFactory {
dto.setConcurrentlySchedulableTaskCount(port.getMaxConcurrentTasks());
dto.setUseCompression(port.isUseCompression());
dto.setExists(port.getTargetExists());
+ dto.setVersionedComponentId(port.getVersionedComponentId().orElse(null));
final BatchSettingsDTO batchDTO = new BatchSettingsDTO();
batchDTO.setCount(port.getBatchCount());
@@ -1619,6 +1639,7 @@ public final class DtoFactory {
dto.setInactiveRemoteInputPortCount(inactiveRemoteInputPortCount);
dto.setActiveRemoteOutputPortCount(activeRemoteOutputPortCount);
dto.setInactiveRemoteOutputPortCount(inactiveRemoteOutputPortCount);
+ dto.setVersionedComponentId(group.getVersionedComponentId().orElse(null));
final ProcessGroupCounts counts = group.getCounts();
if (counts != null) {
@@ -1679,6 +1700,7 @@ public final class DtoFactory {
dto.setId(componentAuthorizable.getIdentifier());
dto.setParentGroupId(componentAuthorizable.getProcessGroupIdentifier());
dto.setName(authorizable.getResource().getName());
+
return dto;
}
@@ -1738,6 +1760,81 @@ public final class DtoFactory {
return dto;
}
+ public AffectedComponentEntity createAffectedComponentEntity(final ProcessorEntity processorEntity) {
+ if (processorEntity == null) {
+ return null;
+ }
+
+ final AffectedComponentEntity component = new AffectedComponentEntity();
+ component.setBulletins(processorEntity.getBulletins());
+ component.setId(processorEntity.getId());
+ component.setPermissions(processorEntity.getPermissions());
+ component.setPosition(processorEntity.getPosition());
+ component.setRevision(processorEntity.getRevision());
+ component.setUri(processorEntity.getUri());
+
+ final ProcessorDTO processorDto = processorEntity.getComponent();
+ final AffectedComponentDTO componentDto = new AffectedComponentDTO();
+ componentDto.setId(processorDto.getId());
+ componentDto.setName(processorDto.getName());
+ componentDto.setProcessGroupId(processorDto.getParentGroupId());
+ componentDto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
+ componentDto.setState(processorDto.getState());
+ componentDto.setValidationErrors(processorDto.getValidationErrors());
+ component.setComponent(componentDto);
+
+ return component;
+ }
+
+ public AffectedComponentEntity createAffectedComponentEntity(final ControllerServiceEntity serviceEntity) {
+ if (serviceEntity == null) {
+ return null;
+ }
+
+ final AffectedComponentEntity component = new AffectedComponentEntity();
+ component.setBulletins(serviceEntity.getBulletins());
+ component.setId(serviceEntity.getId());
+ component.setPermissions(serviceEntity.getPermissions());
+ component.setPosition(serviceEntity.getPosition());
+ component.setRevision(serviceEntity.getRevision());
+ component.setUri(serviceEntity.getUri());
+
+ final ControllerServiceDTO serviceDto = serviceEntity.getComponent();
+ final AffectedComponentDTO componentDto = new AffectedComponentDTO();
+ componentDto.setId(serviceDto.getId());
+ componentDto.setName(serviceDto.getName());
+ componentDto.setProcessGroupId(serviceDto.getParentGroupId());
+ componentDto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
+ componentDto.setState(serviceDto.getState());
+ componentDto.setValidationErrors(serviceDto.getValidationErrors());
+ component.setComponent(componentDto);
+
+ return component;
+ }
+
+ public AffectedComponentEntity createAffectedComponentEntity(final RemoteProcessGroupPortDTO remotePortDto, final String referenceType, final RemoteProcessGroupEntity rpgEntity) {
+ if (remotePortDto == null) {
+ return null;
+ }
+
+ final AffectedComponentEntity component = new AffectedComponentEntity();
+ component.setId(remotePortDto.getId());
+ component.setPermissions(rpgEntity.getPermissions());
+ component.setRevision(rpgEntity.getRevision());
+ component.setUri(rpgEntity.getUri());
+
+ final AffectedComponentDTO componentDto = new AffectedComponentDTO();
+ componentDto.setId(remotePortDto.getId());
+ componentDto.setName(remotePortDto.getName());
+ componentDto.setProcessGroupId(remotePortDto.getGroupId());
+ componentDto.setReferenceType(referenceType);
+ componentDto.setState(remotePortDto.isTransmitting() ? "Running" : "Stopped");
+ component.setComponent(componentDto);
+
+ return component;
+ }
+
+
public AffectedComponentDTO createAffectedComponentDto(final ConfiguredComponent component) {
final AffectedComponentDTO dto = new AffectedComponentDTO();
dto.setId(component.getIdentifier());
@@ -2047,6 +2144,8 @@ public final class DtoFactory {
dto.setPosition(createPositionDto(group.getPosition()));
dto.setComments(group.getComments());
dto.setName(group.getName());
+ dto.setVersionedComponentId(group.getVersionedComponentId().orElse(null));
+ dto.setVersionControlInformation(createVersionControlInformationDto(group.getVersionControlInformation()));
final Map<String, String> variables = group.getVariableRegistry().getVariableMap().entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey().getName(), entry -> entry.getValue()));
@@ -2070,6 +2169,68 @@ public final class DtoFactory {
return dto;
}
+ public VersionControlInformationDTO createVersionControlInformationDto(final VersionControlInformation versionControlInfo) {
+ if (versionControlInfo == null) {
+ return null;
+ }
+
+ final VersionControlInformationDTO dto = new VersionControlInformationDTO();
+ dto.setRegistryId(versionControlInfo.getRegistryIdentifier());
+ dto.setBucketId(versionControlInfo.getBucketIdentifier());
+ dto.setFlowId(versionControlInfo.getFlowIdentifier());
+ dto.setVersion(versionControlInfo.getVersion());
+ dto.setCurrent(versionControlInfo.getCurrent().orElse(null));
+ dto.setModified(versionControlInfo.getModified().orElse(null));
+ return dto;
+ }
+
+ public Map<String, String> createVersionControlComponentMappingDto(final InstantiatedVersionedProcessGroup group) {
+ final Map<String, String> mapping = new HashMap<>();
+
+ mapping.put(group.getInstanceId(), group.getIdentifier());
+ group.getProcessors().stream()
+ .map(proc -> (InstantiatedVersionedProcessor) proc)
+ .forEach(proc -> mapping.put(proc.getInstanceId(), proc.getIdentifier()));
+ group.getInputPorts().stream()
+ .map(port -> (InstantiatedVersionedPort) port)
+ .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier()));
+ group.getOutputPorts().stream()
+ .map(port -> (InstantiatedVersionedPort) port)
+ .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier()));
+ group.getControllerServices().stream()
+ .map(service -> (InstantiatedVersionedControllerService) service)
+ .forEach(service -> mapping.put(service.getInstanceId(), service.getIdentifier()));
+ group.getLabels().stream()
+ .map(label -> (InstantiatedVersionedLabel) label)
+ .forEach(label -> mapping.put(label.getInstanceId(), label.getIdentifier()));
+ group.getConnections().stream()
+ .map(conn -> (InstantiatedVersionedConnection) conn)
+ .forEach(conn -> mapping.put(conn.getInstanceId(), conn.getIdentifier()));
+ group.getRemoteProcessGroups().stream()
+ .map(rpg -> (InstantiatedVersionedRemoteProcessGroup) rpg)
+ .forEach(rpg -> {
+ mapping.put(rpg.getInstanceId(), rpg.getIdentifier());
+
+ rpg.getInputPorts().stream()
+ .map(port -> (InstantiatedVersionedRemoteGroupPort) port)
+ .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier()));
+
+ rpg.getOutputPorts().stream()
+ .map(port -> (InstantiatedVersionedRemoteGroupPort) port)
+ .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier()));
+ });
+
+ group.getProcessGroups().stream()
+ .map(child -> (InstantiatedVersionedProcessGroup) child)
+ .forEach(child -> {
+ final Map<String, String> childMapping = createVersionControlComponentMappingDto(child);
+ mapping.putAll(childMapping);
+ });
+
+ return mapping;
+ }
+
+
/**
* Creates a ProcessGroupContentDTO from the specified ProcessGroup.
*
@@ -2418,6 +2579,7 @@ public final class DtoFactory {
dto.setDeprecated(node.isDeprecated());
dto.setExtensionMissing(node.isExtensionMissing());
dto.setMultipleVersionsAvailable(compatibleBundles.size() > 1);
+ dto.setVersionedComponentId(node.getVersionedComponentId().orElse(null));
dto.setType(node.getCanonicalClassName());
dto.setBundle(createBundleDto(bundleCoordinate));
@@ -2989,6 +3151,7 @@ public final class DtoFactory {
copy.setPosition(original.getPosition());
copy.setWidth(original.getWidth());
copy.setHeight(original.getHeight());
+ copy.setVersionedComponentId(original.getVersionedComponentId());
return copy;
}
@@ -3012,6 +3175,7 @@ public final class DtoFactory {
copy.setMultipleVersionsAvailable(original.getMultipleVersionsAvailable());
copy.setPersistsState(original.getPersistsState());
copy.setValidationErrors(copy(original.getValidationErrors()));
+ copy.setVersionedComponentId(original.getVersionedComponentId());
return copy;
}
@@ -3020,6 +3184,7 @@ public final class DtoFactory {
copy.setId(original.getId());
copy.setParentGroupId(original.getParentGroupId());
copy.setPosition(original.getPosition());
+ copy.setVersionedComponentId(original.getVersionedComponentId());
return copy;
}
@@ -3088,6 +3253,7 @@ public final class DtoFactory {
copy.setExtensionMissing(original.getExtensionMissing());
copy.setMultipleVersionsAvailable(original.getMultipleVersionsAvailable());
copy.setValidationErrors(copy(original.getValidationErrors()));
+ copy.setVersionedComponentId(original.getVersionedComponentId());
return copy;
}
@@ -3132,6 +3298,7 @@ public final class DtoFactory {
copy.setzIndex(original.getzIndex());
copy.setLabelIndex(original.getLabelIndex());
copy.setBends(copy(original.getBends()));
+ copy.setVersionedComponentId(original.getVersionedComponentId());
return copy;
}
@@ -3164,6 +3331,7 @@ public final class DtoFactory {
copy.setUserAccessControl(copy(original.getUserAccessControl()));
copy.setGroupAccessControl(copy(original.getGroupAccessControl()));
copy.setValidationErrors(copy(original.getValidationErrors()));
+ copy.setVersionedComponentId(original.getVersionedComponentId());
return copy;
}
@@ -3180,6 +3348,8 @@ public final class DtoFactory {
copy.setConcurrentlySchedulableTaskCount(original.getConcurrentlySchedulableTaskCount());
copy.setUseCompression(original.getUseCompression());
copy.setExists(original.getExists());
+ copy.setVersionedComponentId(original.getVersionedComponentId());
+
final BatchSettingsDTO batchOrg = original.getBatchSettings();
if (batchOrg != null) {
final BatchSettingsDTO batchCopy = new BatchSettingsDTO();
@@ -3199,8 +3369,10 @@ public final class DtoFactory {
copy.setInputPortCount(original.getInputPortCount());
copy.setInvalidCount(original.getInvalidCount());
copy.setName(original.getName());
+ copy.setVersionControlInformation(copy(original.getVersionControlInformation()));
copy.setOutputPortCount(original.getOutputPortCount());
copy.setParentGroupId(original.getParentGroupId());
+ copy.setVersionedComponentId(original.getVersionedComponentId());
copy.setRunningCount(original.getRunningCount());
copy.setStoppedCount(original.getStoppedCount());
@@ -3215,6 +3387,21 @@ public final class DtoFactory {
return copy;
}
+ public VersionControlInformationDTO copy(final VersionControlInformationDTO original) {
+ if (original == null) {
+ return null;
+ }
+
+ final VersionControlInformationDTO copy = new VersionControlInformationDTO();
+ copy.setRegistryId(original.getRegistryId());
+ copy.setBucketId(original.getBucketId());
+ copy.setFlowId(original.getFlowId());
+ copy.setVersion(original.getVersion());
+ copy.setCurrent(original.getCurrent());
+ copy.setModified(original.getModified());
+ return copy;
+ }
+
public RemoteProcessGroupDTO copy(final RemoteProcessGroupDTO original) {
final RemoteProcessGroupContentsDTO originalContents = original.getContents();
final RemoteProcessGroupContentsDTO copyContents = new RemoteProcessGroupContentsDTO();
@@ -3256,6 +3443,7 @@ public final class DtoFactory {
copy.setProxyUser(original.getProxyUser());
copy.setProxyPassword(original.getProxyPassword());
copy.setLocalNetworkInterface(original.getLocalNetworkInterface());
+ copy.setVersionedComponentId(original.getVersionedComponentId());
copy.setContents(copyContents);
@@ -3268,6 +3456,7 @@ public final class DtoFactory {
connectable.setId(port.getId());
connectable.setName(port.getName());
connectable.setType(type.name());
+ connectable.setVersionedComponentId(port.getVersionedComponentId());
return connectable;
}
@@ -3277,6 +3466,7 @@ public final class DtoFactory {
connectable.setId(processor.getId());
connectable.setName(processor.getName());
connectable.setType(ConnectableType.PROCESSOR.name());
+ connectable.setVersionedComponentId(processor.getVersionedComponentId());
return connectable;
}
@@ -3285,6 +3475,7 @@ public final class DtoFactory {
connectable.setGroupId(funnel.getParentGroupId());
connectable.setId(funnel.getId());
connectable.setType(ConnectableType.FUNNEL.name());
+ connectable.setVersionedComponentId(funnel.getVersionedComponentId());
return connectable;
}
@@ -3294,6 +3485,7 @@ public final class DtoFactory {
connectable.setId(remoteGroupPort.getId());
connectable.setName(remoteGroupPort.getName());
connectable.setType(type.name());
+ connectable.setVersionedComponentId(connectable.getVersionedComponentId());
return connectable;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
index 16781c6..dd8d67f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
@@ -67,6 +67,7 @@ import org.apache.nifi.web.api.entity.TenantEntity;
import org.apache.nifi.web.api.entity.UserEntity;
import org.apache.nifi.web.api.entity.UserGroupEntity;
import org.apache.nifi.web.api.entity.VariableRegistryEntity;
+import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
import java.util.Date;
import java.util.List;
@@ -537,4 +538,11 @@ public final class EntityFactory {
}
return entity;
}
+
+ public VersionControlInformationEntity createVersionControlInformationEntity(final VersionControlInformationDTO dto, final RevisionDTO processGroupRevision) {
+ final VersionControlInformationEntity entity = new VersionControlInformationEntity();
+ entity.setVersionControlInformation(dto);
+ entity.setProcessGroupRevision(processGroupRevision);
+ return entity;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index a4e8000..615f00b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -1599,6 +1599,7 @@ public class ControllerFacade implements Authorizable {
final List<String> matches = new ArrayList<>();
addIfAppropriate(searchStr, port.getIdentifier(), "Id", matches);
+ addIfAppropriate(searchStr, port.getVersionedComponentId().orElse(null), "Version Control ID", matches);
addIfAppropriate(searchStr, port.getName(), "Name", matches);
addIfAppropriate(searchStr, port.getComments(), "Comments", matches);
@@ -1649,6 +1650,7 @@ public class ControllerFacade implements Authorizable {
final Processor processor = procNode.getProcessor();
addIfAppropriate(searchStr, procNode.getIdentifier(), "Id", matches);
+ addIfAppropriate(searchStr, procNode.getVersionedComponentId().orElse(null), "Version Control ID", matches);
addIfAppropriate(searchStr, procNode.getName(), "Name", matches);
addIfAppropriate(searchStr, procNode.getComments(), "Comments", matches);
@@ -1753,6 +1755,7 @@ public class ControllerFacade implements Authorizable {
}
addIfAppropriate(searchStr, group.getIdentifier(), "Id", matches);
+ addIfAppropriate(searchStr, group.getVersionedComponentId().orElse(null), "Version Control ID", matches);
addIfAppropriate(searchStr, group.getName(), "Name", matches);
addIfAppropriate(searchStr, group.getComments(), "Comments", matches);
@@ -1783,6 +1786,7 @@ public class ControllerFacade implements Authorizable {
// search id and name
addIfAppropriate(searchStr, connection.getIdentifier(), "Id", matches);
+ addIfAppropriate(searchStr, connection.getVersionedComponentId().orElse(null), "Version Control ID", matches);
addIfAppropriate(searchStr, connection.getName(), "Name", matches);
// search relationships
@@ -1864,6 +1868,7 @@ public class ControllerFacade implements Authorizable {
private ComponentSearchResultDTO search(final String searchStr, final RemoteProcessGroup group) {
final List<String> matches = new ArrayList<>();
addIfAppropriate(searchStr, group.getIdentifier(), "Id", matches);
+ addIfAppropriate(searchStr, group.getVersionedComponentId().orElse(null), "Version Control ID", matches);
addIfAppropriate(searchStr, group.getName(), "Name", matches);
addIfAppropriate(searchStr, group.getComments(), "Comments", matches);
addIfAppropriate(searchStr, group.getTargetUris(), "URLs", matches);
@@ -1889,6 +1894,7 @@ public class ControllerFacade implements Authorizable {
private ComponentSearchResultDTO search(final String searchStr, final Funnel funnel) {
final List<String> matches = new ArrayList<>();
addIfAppropriate(searchStr, funnel.getIdentifier(), "Id", matches);
+ addIfAppropriate(searchStr, funnel.getVersionedComponentId().orElse(null), "Version Control ID", matches);
if (matches.isEmpty()) {
return null;
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
index d7ca806..5f4dba5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
@@ -16,14 +16,17 @@
*/
package org.apache.nifi.web.dao;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.VariableRegistryDTO;
+import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
public interface ProcessGroupDAO {
@@ -104,6 +107,27 @@ public interface ProcessGroupDAO {
ProcessGroup updateProcessGroup(ProcessGroupDTO processGroup);
/**
+ * Updates the process group so that it matches the proposed flow
+ *
+ * @param groupId the ID of the process group
+ * @param proposedSnapshot Flow the new version of the flow
+ * @param versionControlInformation the new Version Control Information
+ * @param the seed value to use for generating ID's for new components
+ * @return the process group
+ */
+ ProcessGroup updateProcessGroupFlow(String groupId, VersionedFlowSnapshot proposedSnapshot, VersionControlInformationDTO versionControlInformation, String componentIdSeed,
+ boolean verifyNotModified);
+
+ /**
+ * Applies the given Version Control Information to the Process Group
+ *
+ * @param versionControlInformation the Version Control Information to apply
+ * @param versionedComponentMapping a mapping of Component ID to Versioned Component ID
+ * @return the Process Group
+ */
+ ProcessGroup updateVersionControlInformation(VersionControlInformationDTO versionControlInformation, Map<String, String> versionedComponentMapping);
+
+ /**
* Updates the specified variable registry
*
* @param variableRegistry the Variable Registry
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
index ec584de..6fa316d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
@@ -16,15 +16,14 @@
*/
package org.apache.nifi.web.dao.impl;
+import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import org.apache.nifi.connectable.Connectable;
-import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.FlowController;
@@ -33,9 +32,14 @@ import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.registry.flow.StandardVersionControlInformation;
+import org.apache.nifi.registry.flow.VersionControlInformation;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.VariableRegistryDTO;
+import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
import org.apache.nifi.web.api.entity.VariableEntity;
import org.apache.nifi.web.dao.ProcessGroupDAO;
@@ -90,24 +94,30 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
public void verifyScheduleComponents(final String groupId, final ScheduledState state,final Set<String> componentIds) {
final ProcessGroup group = locateProcessGroup(flowController, groupId);
- final Set<Connectable> connectables = new HashSet<>(componentIds.size());
for (final String componentId : componentIds) {
final Connectable connectable = group.findLocalConnectable(componentId);
if (connectable == null) {
- throw new ResourceNotFoundException("Unable to find component with id " + componentId);
- }
+ final RemoteGroupPort remotePort = group.findRemoteGroupPort(componentId);
+ if (remotePort == null) {
+ throw new ResourceNotFoundException("Unable to find component with id " + componentId);
+ }
- connectables.add(connectable);
- }
+ if (ScheduledState.RUNNING.equals(state)) {
+ remotePort.verifyCanStart();
+ } else {
+ remotePort.verifyCanStop();
+ }
- // verify as appropriate
- connectables.forEach(connectable -> {
+ continue;
+ }
+
+ // verify as appropriate
if (ScheduledState.RUNNING.equals(state)) {
group.verifyCanStart(connectable);
} else {
group.verifyCanStop(connectable);
}
- });
+ }
}
@Override
@@ -134,22 +144,46 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
for (final String componentId : componentIds) {
final Connectable connectable = group.findLocalConnectable(componentId);
if (ScheduledState.RUNNING.equals(state)) {
- if (ConnectableType.PROCESSOR.equals(connectable.getConnectableType())) {
- final CompletableFuture<?> processorFuture = connectable.getProcessGroup().startProcessor((ProcessorNode) connectable, true);
- future = CompletableFuture.allOf(future, processorFuture);
- } else if (ConnectableType.INPUT_PORT.equals(connectable.getConnectableType())) {
- connectable.getProcessGroup().startInputPort((Port) connectable);
- } else if (ConnectableType.OUTPUT_PORT.equals(connectable.getConnectableType())) {
- connectable.getProcessGroup().startOutputPort((Port) connectable);
+ switch (connectable.getConnectableType()) {
+ case PROCESSOR:
+ final CompletableFuture<?> processorFuture = connectable.getProcessGroup().startProcessor((ProcessorNode) connectable, true);
+ future = CompletableFuture.allOf(future, processorFuture);
+ break;
+ case INPUT_PORT:
+ connectable.getProcessGroup().startInputPort((Port) connectable);
+ break;
+ case OUTPUT_PORT:
+ connectable.getProcessGroup().startOutputPort((Port) connectable);
+ break;
+ case REMOTE_INPUT_PORT:
+ final RemoteGroupPort remoteInputPort = group.findRemoteGroupPort(componentId);
+ remoteInputPort.getRemoteProcessGroup().startTransmitting(remoteInputPort);
+ break;
+ case REMOTE_OUTPUT_PORT:
+ final RemoteGroupPort remoteOutputPort = group.findRemoteGroupPort(componentId);
+ remoteOutputPort.getRemoteProcessGroup().startTransmitting(remoteOutputPort);
+ break;
}
} else {
- if (ConnectableType.PROCESSOR.equals(connectable.getConnectableType())) {
- final CompletableFuture<?> processorFuture = connectable.getProcessGroup().stopProcessor((ProcessorNode) connectable);
- future = CompletableFuture.allOf(future, processorFuture);
- } else if (ConnectableType.INPUT_PORT.equals(connectable.getConnectableType())) {
- connectable.getProcessGroup().stopInputPort((Port) connectable);
- } else if (ConnectableType.OUTPUT_PORT.equals(connectable.getConnectableType())) {
- connectable.getProcessGroup().stopOutputPort((Port) connectable);
+ switch (connectable.getConnectableType()) {
+ case PROCESSOR:
+ final CompletableFuture<?> processorFuture = connectable.getProcessGroup().stopProcessor((ProcessorNode) connectable);
+ future = CompletableFuture.allOf(future, processorFuture);
+ break;
+ case INPUT_PORT:
+ connectable.getProcessGroup().stopInputPort((Port) connectable);
+ break;
+ case OUTPUT_PORT:
+ connectable.getProcessGroup().stopOutputPort((Port) connectable);
+ break;
+ case REMOTE_INPUT_PORT:
+ final RemoteGroupPort remoteInputPort = group.findRemoteGroupPort(componentId);
+ remoteInputPort.getRemoteProcessGroup().stopTransmitting(remoteInputPort);
+ break;
+ case REMOTE_OUTPUT_PORT:
+ final RemoteGroupPort remoteOutputPort = group.findRemoteGroupPort(componentId);
+ remoteOutputPort.getRemoteProcessGroup().stopTransmitting(remoteOutputPort);
+ break;
}
}
}
@@ -197,6 +231,41 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
}
@Override
+ public ProcessGroup updateVersionControlInformation(final VersionControlInformationDTO versionControlInformation, final Map<String, String> versionedComponentMapping) {
+ final String groupId = versionControlInformation.getGroupId();
+ final ProcessGroup group = locateProcessGroup(flowController, groupId);
+
+ final String registryId = versionControlInformation.getRegistryId();
+ final String bucketId = versionControlInformation.getBucketId();
+ final String flowId = versionControlInformation.getFlowId();
+ final int version = versionControlInformation.getVersion();
+
+ final VersionControlInformation vci = new StandardVersionControlInformation(registryId, bucketId, flowId, version, null, false, true);
+ group.setVersionControlInformation(vci, versionedComponentMapping);
+
+ return group;
+ }
+
+ @Override
+ public ProcessGroup updateProcessGroupFlow(final String groupId, final VersionedFlowSnapshot proposedSnapshot, final VersionControlInformationDTO versionControlInformation,
+ final String componentIdSeed, final boolean verifyNotModified) {
+ final ProcessGroup group = locateProcessGroup(flowController, groupId);
+ group.updateFlow(proposedSnapshot, componentIdSeed, verifyNotModified);
+
+ final StandardVersionControlInformation svci = new StandardVersionControlInformation(
+ versionControlInformation.getRegistryId(),
+ versionControlInformation.getBucketId(),
+ versionControlInformation.getFlowId(),
+ versionControlInformation.getVersion(),
+ proposedSnapshot.getFlowContents(),
+ versionControlInformation.getModified(),
+ versionControlInformation.getCurrent());
+
+ group.setVersionControlInformation(svci, Collections.emptyMap());
+ return group;
+ }
+
+ @Override
public ProcessGroup updateVariableRegistry(final VariableRegistryDTO variableRegistry) {
final ProcessGroup group = locateProcessGroup(flowController, variableRegistry.getProcessGroupId());
if (group == null) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java
new file mode 100644
index 0000000..7fdaf56
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.util;
+
+import java.util.Optional;
+
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.web.NiFiServiceFacade;
+import org.apache.nifi.web.api.dto.AffectedComponentDTO;
+import org.apache.nifi.web.api.dto.DtoFactory;
+import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
+import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
+import org.apache.nifi.web.api.entity.AffectedComponentEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
+
+public class AffectedComponentUtils {
+
+ public static AffectedComponentEntity updateEntity(final AffectedComponentEntity componentEntity, final NiFiServiceFacade serviceFacade,
+ final DtoFactory dtoFactory, final NiFiUser user) {
+
+ switch (componentEntity.getComponent().getReferenceType()) {
+ case AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR:
+ final ProcessorEntity procEntity = serviceFacade.getProcessor(componentEntity.getId(), user);
+ return dtoFactory.createAffectedComponentEntity(procEntity);
+ case AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT: {
+ final RemoteProcessGroupEntity remoteGroupEntity = serviceFacade.getRemoteProcessGroup(componentEntity.getComponent().getProcessGroupId(), user);
+ final RemoteProcessGroupContentsDTO remoteGroupContents = remoteGroupEntity.getComponent().getContents();
+ final Optional<RemoteProcessGroupPortDTO> portDtoOption = remoteGroupContents.getInputPorts().stream()
+ .filter(port -> port.getId().equals(componentEntity.getId()))
+ .findFirst();
+
+ if (portDtoOption.isPresent()) {
+ final RemoteProcessGroupPortDTO portDto = portDtoOption.get();
+ return dtoFactory.createAffectedComponentEntity(portDto, AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT, remoteGroupEntity);
+ }
+ break;
+ }
+ case AffectedComponentDTO.COMPONENT_TYPE_REMOTE_OUTPUT_PORT: {
+ final RemoteProcessGroupEntity remoteGroupEntity = serviceFacade.getRemoteProcessGroup(componentEntity.getComponent().getProcessGroupId(), user);
+ final RemoteProcessGroupContentsDTO remoteGroupContents = remoteGroupEntity.getComponent().getContents();
+ final Optional<RemoteProcessGroupPortDTO> portDtoOption = remoteGroupContents.getOutputPorts().stream()
+ .filter(port -> port.getId().equals(componentEntity.getId()))
+ .findFirst();
+
+ if (portDtoOption.isPresent()) {
+ final RemoteProcessGroupPortDTO portDto = portDtoOption.get();
+ return dtoFactory.createAffectedComponentEntity(portDto, AffectedComponentDTO.COMPONENT_TYPE_REMOTE_OUTPUT_PORT, remoteGroupEntity);
+ }
+ break;
+ }
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java
new file mode 100644
index 0000000..a6efb71
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.util;
+
+import java.util.concurrent.TimeUnit;
+
+public class CancellableTimedPause implements Pause {
+ private final long expirationNanoTime;
+ private final long pauseNanos;
+ private volatile boolean cancelled = false;
+
+ public CancellableTimedPause(final long pauseTime, final long expirationTime, final TimeUnit timeUnit) {
+ final long expirationNanos = TimeUnit.NANOSECONDS.convert(expirationTime, timeUnit);
+ expirationNanoTime = System.nanoTime() + expirationNanos;
+ pauseNanos = Math.max(1L, TimeUnit.NANOSECONDS.convert(pauseTime, timeUnit));
+ }
+
+ public void cancel() {
+ cancelled = true;
+ }
+
+ @Override
+ public boolean pause() {
+ if (cancelled) {
+ return false;
+ }
+
+ long sysTime = System.nanoTime();
+ final long maxWaitTime = System.nanoTime() + pauseNanos;
+ while (sysTime < maxWaitTime) {
+ try {
+ TimeUnit.NANOSECONDS.wait(pauseNanos);
+ } catch (final InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+
+ sysTime = System.nanoTime();
+ }
+
+ return sysTime < expirationNanoTime && !cancelled;
+ }
+
+}