You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/01/21 14:05:12 UTC
[60/64] incubator-nifi git commit: Merge branch 'develop' into
NIFI-250
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c5d452c1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 0000000,c8683b0..41660f2
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@@ -1,0 -1,2877 +1,2885 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.web;
+
-import org.apache.nifi.web.OptimisticLockingManager;
-import org.apache.nifi.web.ConfigurationSnapshot;
+ import java.nio.charset.StandardCharsets;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Date;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.LinkedHashMap;
+ import java.util.LinkedHashSet;
+ import java.util.List;
+ import java.util.ListIterator;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.TimeZone;
+ import java.util.UUID;
+ import java.util.concurrent.TimeUnit;
+
+ import javax.ws.rs.WebApplicationException;
+
+ import org.apache.nifi.action.Action;
+ import org.apache.nifi.action.Component;
+ import org.apache.nifi.action.Operation;
+ import org.apache.nifi.action.details.PurgeDetails;
+ import org.apache.nifi.admin.service.AccountNotFoundException;
+ import org.apache.nifi.admin.service.AuditService;
+ import org.apache.nifi.admin.service.UserService;
+ import org.apache.nifi.authorization.Authority;
+ import org.apache.nifi.cluster.HeartbeatPayload;
+ import org.apache.nifi.cluster.context.ClusterContext;
+ import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
+ import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
+ import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
+ import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
+ import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+ import org.apache.nifi.cluster.node.Node;
+ import org.apache.nifi.cluster.protocol.NodeIdentifier;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.connectable.Connection;
+ import org.apache.nifi.connectable.Funnel;
+ import org.apache.nifi.connectable.Port;
+ import org.apache.nifi.controller.Counter;
+ import org.apache.nifi.controller.ProcessorNode;
+ import org.apache.nifi.controller.Snippet;
+ import org.apache.nifi.controller.Template;
+ import org.apache.nifi.controller.label.Label;
+ import org.apache.nifi.controller.repository.claim.ContentDirection;
+ import org.apache.nifi.controller.status.ConnectionStatus;
+ import org.apache.nifi.controller.status.PortStatus;
+ import org.apache.nifi.controller.status.ProcessGroupStatus;
+ import org.apache.nifi.controller.status.ProcessorStatus;
+ import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+ import org.apache.nifi.diagnostics.SystemDiagnostics;
+ import org.apache.nifi.groups.ProcessGroup;
+ import org.apache.nifi.groups.ProcessGroupCounts;
+ import org.apache.nifi.groups.RemoteProcessGroup;
+ import org.apache.nifi.history.History;
+ import org.apache.nifi.history.HistoryQuery;
+ import org.apache.nifi.history.PreviousValue;
+ import org.apache.nifi.remote.RemoteGroupPort;
+ import org.apache.nifi.remote.RootGroupPort;
+ import org.apache.nifi.reporting.Bulletin;
+ import org.apache.nifi.reporting.BulletinQuery;
+ import org.apache.nifi.reporting.BulletinRepository;
+ import org.apache.nifi.web.security.user.NiFiUserUtils;
+ import org.apache.nifi.user.AccountStatus;
+ import org.apache.nifi.user.NiFiUser;
+ import org.apache.nifi.user.NiFiUserGroup;
+ import org.apache.nifi.util.FormatUtils;
+ import org.apache.nifi.util.NiFiProperties;
+ import org.apache.nifi.web.api.dto.BulletinBoardDTO;
+ import org.apache.nifi.web.api.dto.BulletinDTO;
+ import org.apache.nifi.web.api.dto.BulletinQueryDTO;
+ import org.apache.nifi.web.api.dto.ClusterDTO;
+ import org.apache.nifi.web.api.dto.ConnectionDTO;
+ import org.apache.nifi.web.api.dto.ControllerConfigurationDTO;
+ import org.apache.nifi.web.api.dto.ControllerDTO;
+ import org.apache.nifi.web.api.dto.CounterDTO;
+ import org.apache.nifi.web.api.dto.CountersDTO;
+ import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
+ import org.apache.nifi.web.api.dto.DtoFactory;
+ import org.apache.nifi.web.api.dto.FlowSnippetDTO;
+ import org.apache.nifi.web.api.dto.FunnelDTO;
+ import org.apache.nifi.web.api.dto.LabelDTO;
+ import org.apache.nifi.web.api.dto.NodeDTO;
+ import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsDTO;
+ import org.apache.nifi.web.api.dto.PortDTO;
+ import org.apache.nifi.web.api.dto.PreviousValueDTO;
+ import org.apache.nifi.web.api.dto.ProcessGroupDTO;
+ import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
+ import org.apache.nifi.web.api.dto.ProcessorDTO;
+ import org.apache.nifi.web.api.dto.ProcessorHistoryDTO;
+ import org.apache.nifi.web.api.dto.PropertyHistoryDTO;
+ import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
+ import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
+ import org.apache.nifi.web.api.dto.RevisionDTO;
+ import org.apache.nifi.web.api.dto.SnippetDTO;
+ import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO;
+ import org.apache.nifi.web.api.dto.TemplateDTO;
+ import org.apache.nifi.web.api.dto.UserDTO;
+ import org.apache.nifi.web.api.dto.UserGroupDTO;
+ import org.apache.nifi.web.api.dto.action.ActionDTO;
+ import org.apache.nifi.web.api.dto.action.HistoryDTO;
+ import org.apache.nifi.web.api.dto.action.HistoryQueryDTO;
+ import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
+ import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
+ import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO;
+ import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO;
+ import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
+ import org.apache.nifi.web.api.dto.status.ClusterConnectionStatusDTO;
+ import org.apache.nifi.web.api.dto.status.ClusterPortStatusDTO;
+ import org.apache.nifi.web.api.dto.status.ClusterProcessorStatusDTO;
+ import org.apache.nifi.web.api.dto.status.ClusterRemoteProcessGroupStatusDTO;
+ import org.apache.nifi.web.api.dto.status.ClusterStatusDTO;
+ import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO;
+ import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
+ import org.apache.nifi.web.api.dto.status.NodeConnectionStatusDTO;
+ import org.apache.nifi.web.api.dto.status.NodePortStatusDTO;
+ import org.apache.nifi.web.api.dto.status.NodeProcessorStatusDTO;
+ import org.apache.nifi.web.api.dto.status.NodeRemoteProcessGroupStatusDTO;
+ import org.apache.nifi.web.api.dto.status.NodeStatusDTO;
+ import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
+ import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
+ import org.apache.nifi.web.controller.ControllerFacade;
+ import org.apache.nifi.web.dao.ConnectionDAO;
+ import org.apache.nifi.web.dao.FunnelDAO;
+ import org.apache.nifi.web.dao.LabelDAO;
+ import org.apache.nifi.web.dao.PortDAO;
+ import org.apache.nifi.web.dao.ProcessGroupDAO;
+ import org.apache.nifi.web.dao.ProcessorDAO;
+ import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
+ import org.apache.nifi.web.dao.SnippetDAO;
+ import org.apache.nifi.web.dao.TemplateDAO;
+ import org.apache.nifi.web.util.DownloadableContent;
+ import org.apache.nifi.web.util.SnippetUtils;
+
+ import org.apache.commons.collections4.CollectionUtils;
+ import org.apache.commons.lang3.StringUtils;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import org.springframework.security.access.AccessDeniedException;
+
+ /**
+ * Implementation of NiFiServiceFacade that performs revision checking.
+ */
+ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
+
+ private static final Logger logger = LoggerFactory.getLogger(StandardNiFiServiceFacade.class);
+ private static final String INVALID_REVISION_ERROR = "Given revision %s does not match current revision %s.";
+ private static final String SYNC_ERROR = "This NiFi instance has been updated by '%s'. Please refresh to synchronize the view.";
+
+ // nifi core components
+ private ControllerFacade controllerFacade;
+ private SnippetUtils snippetUtils;
+
+ // optimistic locking manager
+ private OptimisticLockingManager optimisticLockingManager;
+
+ // data access objects
+ private ProcessorDAO processorDAO;
+ private ProcessGroupDAO processGroupDAO;
+ private RemoteProcessGroupDAO remoteProcessGroupDAO;
+ private LabelDAO labelDAO;
+ private FunnelDAO funnelDAO;
+ private SnippetDAO snippetDAO;
+ private PortDAO inputPortDAO;
+ private PortDAO outputPortDAO;
+ private ConnectionDAO connectionDAO;
+ private TemplateDAO templateDAO;
+
+ // administrative services
+ private AuditService auditService;
+ private UserService userService;
+
+ // cluster manager
+ private WebClusterManager clusterManager;
+
+ // properties
+ private NiFiProperties properties;
+ private DtoFactory dtoFactory;
+
+ /**
+ * Checks the specified revision against the current revision.
+ *
+ * @param revision The revision to check
+ * @param clientId The client id
+ * @return Whether or not the request should proceed
+ * @throws NiFiCoreException If the specified revision is not current
+ */
+ private void checkRevision(Revision revision) {
+
+ boolean approved = optimisticLockingManager.isCurrent(revision);
+
+ if (!approved) {
+ Revision currentRevision = optimisticLockingManager.getRevision();
+ logger.debug("Revision check failed because current revision is " + currentRevision + " but supplied revision is " + revision);
+
+ if (StringUtils.isBlank(currentRevision.getClientId()) || currentRevision.getVersion() == null) {
+ throw new InvalidRevisionException(String.format(INVALID_REVISION_ERROR, revision, currentRevision));
+ } else {
+ throw new InvalidRevisionException(String.format(SYNC_ERROR, optimisticLockingManager.getLastModifier()));
+ }
+ }
+ }
+
+ /**
+ * Increments the revision and updates the last modifier.
+ *
+ * @param revision
+ * @return
+ */
+ private Revision updateRevision(Revision revision) {
+ // update the client id and modifier
+ final Revision updatedRevision = optimisticLockingManager.incrementRevision(revision.getClientId());
+
+ // get the nifi user to extract the username
+ NiFiUser user = NiFiUserUtils.getNiFiUser();
+ if (user == null) {
+ optimisticLockingManager.setLastModifier("unknown");
+ } else {
+ optimisticLockingManager.setLastModifier(user.getUserName());
+ }
+
+ return updatedRevision;
+ }
+
+ // -----------------------------------------
+ // Verification Operations
+ // -----------------------------------------
+ @Override
+ public void verifyCreateConnection(String groupId, ConnectionDTO connectionDTO) {
+ connectionDAO.verifyCreate(groupId, connectionDTO);
+ }
+
+ @Override
+ public void verifyUpdateConnection(String groupId, ConnectionDTO connectionDTO) {
+ // if connection does not exist, then the update request is likely creating it
+ // so we don't verify since it will fail
+ if (connectionDAO.hasConnection(groupId, connectionDTO.getId())) {
+ connectionDAO.verifyUpdate(groupId, connectionDTO);
+ } else {
+ connectionDAO.verifyCreate(groupId, connectionDTO);
+ }
+ }
+
+ @Override
+ public void verifyDeleteConnection(String groupId, String connectionId) {
+ connectionDAO.verifyDelete(groupId, connectionId);
+ }
+
+ @Override
+ public void verifyDeleteFunnel(String groupId, String funnelId) {
+ funnelDAO.verifyDelete(groupId, funnelId);
+ }
+
+ @Override
+ public void verifyUpdateInputPort(String groupId, PortDTO inputPortDTO) {
+ // if connection does not exist, then the update request is likely creating it
+ // so we don't verify since it will fail
+ if (inputPortDAO.hasPort(groupId, inputPortDTO.getId())) {
+ inputPortDAO.verifyUpdate(groupId, inputPortDTO);
+ }
+ }
+
+ @Override
+ public void verifyDeleteInputPort(String groupId, String inputPortId) {
+ inputPortDAO.verifyDelete(groupId, inputPortId);
+ }
+
+ @Override
+ public void verifyUpdateOutputPort(String groupId, PortDTO outputPortDTO) {
+ // if connection does not exist, then the update request is likely creating it
+ // so we don't verify since it will fail
+ if (outputPortDAO.hasPort(groupId, outputPortDTO.getId())) {
+ outputPortDAO.verifyUpdate(groupId, outputPortDTO);
+ }
+ }
+
+ @Override
+ public void verifyDeleteOutputPort(String groupId, String outputPortId) {
+ outputPortDAO.verifyDelete(groupId, outputPortId);
+ }
+
+ @Override
+ public void verifyUpdateProcessor(ProcessorDTO processorDTO) {
+ final String groupId = controllerFacade.findProcessGroupIdForProcessor(processorDTO.getId());
+
+ // if processor does not exist, then the update request is likely creating it
+ // so we don't verify since it will fail
+ if (groupId != null) {
+ verifyUpdateProcessor(groupId, processorDTO);
+ }
+ }
+
+ @Override
+ public void verifyUpdateProcessor(String groupId, ProcessorDTO processorDTO) {
+ // if processor does not exist, then the update request is likely creating it
+ // so we don't verify since it will fail
+ if (processorDAO.hasProcessor(groupId, processorDTO.getId())) {
+ processorDAO.verifyUpdate(groupId, processorDTO);
+ }
+ }
+
+ @Override
+ public void verifyDeleteProcessor(String groupId, String processorId) {
+ processorDAO.verifyDelete(groupId, processorId);
+ }
+
+ @Override
+ public void verifyUpdateProcessGroup(ProcessGroupDTO processGroupDTO) {
+ // if group does not exist, then the update request is likely creating it
+ // so we don't verify since it will fail
+ if (processGroupDAO.hasProcessGroup(processGroupDTO.getId())) {
+ processGroupDAO.verifyUpdate(processGroupDTO);
+ }
+ }
+
+ @Override
+ public void verifyDeleteProcessGroup(String groupId) {
+ processGroupDAO.verifyDelete(groupId);
+ }
+
+ @Override
+ public void verifyUpdateRemoteProcessGroup(String groupId, RemoteProcessGroupDTO remoteProcessGroupDTO) {
+ // if remote group does not exist, then the update request is likely creating it
+ // so we don't verify since it will fail
+ if (remoteProcessGroupDAO.hasRemoteProcessGroup(groupId, remoteProcessGroupDTO.getId())) {
+ remoteProcessGroupDAO.verifyUpdate(groupId, remoteProcessGroupDTO);
+ }
+ }
+
+ @Override
+ public void verifyUpdateRemoteProcessGroupInputPort(String groupId, String remoteProcessGroupId, RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
+ remoteProcessGroupDAO.verifyUpdateInputPort(groupId, remoteProcessGroupId, remoteProcessGroupPortDTO);
+ }
+
+ @Override
+ public void verifyUpdateRemoteProcessGroupOutputPort(String groupId, String remoteProcessGroupId, RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
+ remoteProcessGroupDAO.verifyUpdateOutputPort(groupId, remoteProcessGroupId, remoteProcessGroupPortDTO);
+ }
+
+ @Override
+ public void verifyDeleteRemoteProcessGroup(String groupId, String remoteProcessGroupId) {
+ remoteProcessGroupDAO.verifyDelete(groupId, remoteProcessGroupId);
+ }
+
+ // -----------------------------------------
+ // Write Operations
+ // -----------------------------------------
+ @Override
+ public ConfigurationSnapshot<ConnectionDTO> updateConnection(Revision revision, String groupId, ConnectionDTO connectionDTO) {
+
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ // if connection does not exist, then create new connection
+ if (connectionDAO.hasConnection(groupId, connectionDTO.getId()) == false) {
+ return createConnection(revision, groupId, connectionDTO);
+ }
+
+ final Connection connection = connectionDAO.updateConnection(groupId, connectionDTO);
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<ConnectionDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createConnectionDto(connection));
+
+ // save the flow
+ controllerFacade.save();
+
+ return response;
+ }
+
+ @Override
+ public ConfigurationSnapshot<ProcessorDTO> updateProcessor(Revision revision, String groupId, ProcessorDTO processorDTO) {
+
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ // if processor does not exist, then create new processor
+ if (processorDAO.hasProcessor(groupId, processorDTO.getId()) == false) {
+ return createProcessor(revision, groupId, processorDTO);
+ }
+
+ // update the processor
+ ProcessorNode processor = processorDAO.updateProcessor(groupId, processorDTO);
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<ProcessorDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createProcessorDto(processor));
+
+ // save the flow
+ controllerFacade.save();
+
+ return response;
+ }
+
+ @Override
+ public ConfigurationSnapshot<LabelDTO> updateLabel(Revision revision, String groupId, LabelDTO labelDTO) {
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ // if label does not exist, then create new label
+ if (labelDAO.hasLabel(groupId, labelDTO.getId()) == false) {
+ return createLabel(revision, groupId, labelDTO);
+ }
+
+ // update the existing label
+ final Label label = labelDAO.updateLabel(groupId, labelDTO);
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<LabelDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createLabelDto(label));
+
+ // save updated controller
+ controllerFacade.save();
+
+ return response;
+ }
+
+ @Override
+ public ConfigurationSnapshot<FunnelDTO> updateFunnel(Revision revision, String groupId, FunnelDTO funnelDTO) {
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ // if label does not exist, then create new label
+ if (funnelDAO.hasFunnel(groupId, funnelDTO.getId()) == false) {
+ return createFunnel(revision, groupId, funnelDTO);
+ }
+
+ // update the existing label
+ final Funnel funnel = funnelDAO.updateFunnel(groupId, funnelDTO);
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<FunnelDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createFunnelDto(funnel));
+
+ // save updated controller
+ controllerFacade.save();
+
+ return response;
+ }
+
+ @Override
+ public void verifyUpdateSnippet(SnippetDTO snippetDto) {
+ // if snippet does not exist, then the update request is likely creating it
+ // so we don't verify since it will fail
+ if (snippetDAO.hasSnippet(snippetDto.getId())) {
+ snippetDAO.verifyUpdate(snippetDto);
+ }
+ }
+
+ @Override
+ public ConfigurationSnapshot<SnippetDTO> updateSnippet(Revision revision, SnippetDTO snippetDto) {
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ // if label does not exist, then create new label
+ if (snippetDAO.hasSnippet(snippetDto.getId()) == false) {
+ return createSnippet(revision, snippetDto);
+ }
+
+ // update the snippet
+ final Snippet snippet = snippetDAO.updateSnippet(snippetDto);
+
+ // build the snippet dto
+ final SnippetDTO responseSnippetDto = dtoFactory.createSnippetDto(snippet);
+ responseSnippetDto.setContents(snippetUtils.populateFlowSnippet(snippet, false));
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<SnippetDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), responseSnippetDto);
+
+ // save updated controller if applicable
+ if (snippetDto.getParentGroupId() != null && snippet.isLinked()) {
+ controllerFacade.save();
+ }
+
+ return response;
+ }
+
+ @Override
+ public ConfigurationSnapshot<PortDTO> updateInputPort(Revision revision, String groupId, PortDTO inputPortDTO) {
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ // if input port does not exist, then create new input port
+ if (inputPortDAO.hasPort(groupId, inputPortDTO.getId()) == false) {
+ return createInputPort(revision, groupId, inputPortDTO);
+ }
+
+ final Port inputPort = inputPortDAO.updatePort(groupId, inputPortDTO);
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<PortDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createPortDto(inputPort));
+
+ // save updated controller
+ controllerFacade.save();
+
+ return response;
+ }
+
+ @Override
+ public ConfigurationSnapshot<PortDTO> updateOutputPort(Revision revision, String groupId, PortDTO outputPortDTO) {
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ // if output port does not exist, then create new output port
+ if (outputPortDAO.hasPort(groupId, outputPortDTO.getId()) == false) {
+ return createOutputPort(revision, groupId, outputPortDTO);
+ }
+
+ final Port outputPort = outputPortDAO.updatePort(groupId, outputPortDTO);
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<PortDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createPortDto(outputPort));
+
+ // save updated controller
+ controllerFacade.save();
+
+ return response;
+ }
+
+ @Override
+ public ConfigurationSnapshot<RemoteProcessGroupDTO> updateRemoteProcessGroup(Revision revision, String groupId, RemoteProcessGroupDTO remoteProcessGroupDTO) {
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ // if controller reference does not exist, then create new controller reference
+ if (remoteProcessGroupDAO.hasRemoteProcessGroup(groupId, remoteProcessGroupDTO.getId()) == false) {
+ return createRemoteProcessGroup(revision, groupId, remoteProcessGroupDTO);
+ }
+
+ RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.updateRemoteProcessGroup(groupId, remoteProcessGroupDTO);
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<RemoteProcessGroupDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup));
+
+ // save updated controller
+ controllerFacade.save();
+
+ return response;
+ }
+
+ @Override
+ public ConfigurationSnapshot<RemoteProcessGroupPortDTO> updateRemoteProcessGroupInputPort(Revision revision, String groupId, String remoteProcessGroupId, RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ // update the remote port
+ RemoteGroupPort remoteGroupPort = remoteProcessGroupDAO.updateRemoteProcessGroupInputPort(groupId, remoteProcessGroupId, remoteProcessGroupPortDTO);
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<RemoteProcessGroupPortDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort));
+
+ // save updated controller
+ controllerFacade.save();
+
+ return response;
+ }
+
+ @Override
+ public ConfigurationSnapshot<RemoteProcessGroupPortDTO> updateRemoteProcessGroupOutputPort(Revision revision, String groupId, String remoteProcessGroupId, RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ // update the remote port
+ RemoteGroupPort remoteGroupPort = remoteProcessGroupDAO.updateRemoteProcessGroupOutputPort(groupId, remoteProcessGroupId, remoteProcessGroupPortDTO);
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<RemoteProcessGroupPortDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort));
+
+ // save updated controller
+ controllerFacade.save();
+
+ return response;
+ }
+
+ @Override
+ public ConfigurationSnapshot<ProcessGroupDTO> updateProcessGroup(Revision revision, String parentGroupId, ProcessGroupDTO processGroupDTO) {
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ // if process group does not exist, then create new process group
+ if (processGroupDAO.hasProcessGroup(processGroupDTO.getId()) == false) {
+ if (parentGroupId == null) {
+ throw new IllegalArgumentException("Unable to create the specified process group since the parent group was not specified.");
+ } else {
+ return createProcessGroup(parentGroupId, revision, processGroupDTO);
+ }
+ }
+
+ // update the process group
+ ProcessGroup processGroup = processGroupDAO.updateProcessGroup(processGroupDTO);
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<ProcessGroupDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createProcessGroupDto(processGroup));
+
+ // save updated controller
+ controllerFacade.save();
+
+ return response;
+ }
+
+ @Override
+ public ConfigurationSnapshot<ControllerConfigurationDTO> updateControllerConfiguration(Revision revision, ControllerConfigurationDTO controllerConfigurationDTO) {
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ // update the controller configuration through the proxy
+ if (controllerConfigurationDTO.getName() != null) {
+ controllerFacade.setName(controllerConfigurationDTO.getName());
+ }
+ if (controllerConfigurationDTO.getComments() != null) {
+ controllerFacade.setComments(controllerConfigurationDTO.getComments());
+ }
+ if (controllerConfigurationDTO.getMaxTimerDrivenThreadCount() != null) {
+ controllerFacade.setMaxTimerDrivenThreadCount(controllerConfigurationDTO.getMaxTimerDrivenThreadCount());
+ }
+ if (controllerConfigurationDTO.getMaxEventDrivenThreadCount() != null) {
+ controllerFacade.setMaxEventDrivenThreadCount(controllerConfigurationDTO.getMaxEventDrivenThreadCount());
+ }
+
+ // create the controller configuration dto
+ ControllerConfigurationDTO controllerConfig = getControllerConfiguration();
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<ControllerConfigurationDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), controllerConfig);
+
+ // save the flow
+ controllerFacade.save();
+
+ return response;
+ }
+
+ @Override
+ public NodeDTO updateNode(NodeDTO nodeDTO) {
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ if (user == null) {
+ throw new WebApplicationException(new Throwable("Unable to access details for current user."));
+ }
+ final String userDn = user.getDn();
+
+ if (Node.Status.CONNECTING.name().equalsIgnoreCase(nodeDTO.getStatus())) {
+ clusterManager.requestReconnection(nodeDTO.getNodeId(), userDn);
+ } else if (Node.Status.DISCONNECTING.name().equalsIgnoreCase(nodeDTO.getStatus())) {
+ clusterManager.requestDisconnection(nodeDTO.getNodeId(), userDn);
+ } else {
+ // handle primary
+ final Boolean primary = nodeDTO.isPrimary();
+ if (primary != null && primary) {
+ clusterManager.setPrimaryNode(nodeDTO.getNodeId(), userDn);
+ }
+ }
+
+ final String nodeId = nodeDTO.getNodeId();
+ return dtoFactory.createNodeDTO(clusterManager.getNode(nodeId), clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId));
+ }
+
+ @Override
+ public CounterDTO updateCounter(String counterId) {
+ return dtoFactory.createCounterDto(controllerFacade.resetCounter(counterId));
+ }
+
+ @Override
+ public ConfigurationSnapshot<Void> deleteConnection(Revision revision, String groupId, String connectionId) {
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ connectionDAO.deleteConnection(groupId, connectionId);
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
+
+ // save the flow
+ controllerFacade.save();
+
+ return response;
+ }
+
+ @Override
+ public ConfigurationSnapshot<Void> deleteProcessor(Revision revision, String groupId, String processorId) {
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ // delete the processor and synchronize the connection state
+ processorDAO.deleteProcessor(groupId, processorId);
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
+
+ // save the flow
+ controllerFacade.save();
+
+ return response;
+ }
+
+ @Override
+ public ConfigurationSnapshot<Void> deleteLabel(Revision revision, String groupId, String labelId) {
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ // delete the label
+ labelDAO.deleteLabel(groupId, labelId);
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
+
+ // save the flow
+ controllerFacade.save();
+
+ return response;
+ }
+
+ @Override
+ public ConfigurationSnapshot<Void> deleteFunnel(Revision revision, String groupId, String funnelId) {
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ // delete the label
+ funnelDAO.deleteFunnel(groupId, funnelId);
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
+
+ // save the flow
+ controllerFacade.save();
+
+ return response;
+ }
+
+ @Override
+ public void verifyDeleteSnippet(String id) {
+ snippetDAO.verifyDelete(id);
+ }
+
+ @Override
+ public ConfigurationSnapshot<Void> deleteSnippet(Revision revision, String snippetId) {
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ // determine if this snippet was linked to the data flow
+ Snippet snippet = snippetDAO.getSnippet(snippetId);
+ boolean linked = snippet.isLinked();
+
+ // delete the snippet
+ snippetDAO.deleteSnippet(snippetId);
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
+
+ // save the flow if necessary
+ if (linked) {
+ controllerFacade.save();
+ }
+
+ return response;
+ }
+
+ @Override
+ public ConfigurationSnapshot<Void> deleteInputPort(Revision revision, String groupId, String inputPortId) {
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ inputPortDAO.deletePort(groupId, inputPortId);
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
+
+ // save the flow
+ controllerFacade.save();
+
+ return response;
+ }
+
+ @Override
+ public ConfigurationSnapshot<Void> deleteOutputPort(Revision revision, String groupId, String outputPortId) {
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ outputPortDAO.deletePort(groupId, outputPortId);
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
+
+ // save the flow
+ controllerFacade.save();
+
+ return response;
+ }
+
+ @Override
+ public ConfigurationSnapshot<Void> deleteProcessGroup(Revision revision, String groupId) {
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ processGroupDAO.deleteProcessGroup(groupId);
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
+
+ // save the flow
+ controllerFacade.save();
+
+ return response;
+ }
+
+ @Override
+ public ConfigurationSnapshot<Void> deleteRemoteProcessGroup(Revision revision, String groupId, String remoteProcessGroupId) {
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ remoteProcessGroupDAO.deleteRemoteProcessGroup(groupId, remoteProcessGroupId);
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
+
+ // save the flow
+ controllerFacade.save();
+
+ return response;
+ }
+
+ @Override
+ public void deleteTemplate(String id) {
+ // create the template
+ templateDAO.deleteTemplate(id);
+ }
+
+ @Override
+ public ConfigurationSnapshot<ConnectionDTO> createConnection(Revision revision, String groupId, ConnectionDTO connectionDTO) {
+
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ // ensure id is set
+ if (StringUtils.isBlank(connectionDTO.getId())) {
+ connectionDTO.setId(UUID.randomUUID().toString());
+ }
+
+ final Connection connection = connectionDAO.createConnection(groupId, connectionDTO);
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<ConnectionDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createConnectionDto(connection));
+
+ // save the flow
+ controllerFacade.save();
+
+ return response;
+ }
+
+ @Override
+ public ConfigurationSnapshot<ProcessorDTO> createProcessor(Revision revision, String groupId, ProcessorDTO processorDTO) {
+
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ // ensure id is set
+ if (StringUtils.isBlank(processorDTO.getId())) {
+ processorDTO.setId(UUID.randomUUID().toString());
+ }
+
+ // create the processor
+ final ProcessorNode processor = processorDAO.createProcessor(groupId, processorDTO);
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<ProcessorDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createProcessorDto(processor));
+
+ // save the flow
+ controllerFacade.save();
+
+ return response;
+ }
+
+ @Override
+ public ConfigurationSnapshot<LabelDTO> createLabel(Revision revision, String groupId, LabelDTO labelDTO) {
+
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ // ensure id is set
+ if (StringUtils.isBlank(labelDTO.getId())) {
+ labelDTO.setId(UUID.randomUUID().toString());
+ }
+
+ // add the label
+ final Label label = labelDAO.createLabel(groupId, labelDTO);
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<LabelDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createLabelDto(label));
+
+ // save the flow
+ controllerFacade.save();
+
+ return response;
+ }
+
+ @Override
+ public ConfigurationSnapshot<FunnelDTO> createFunnel(Revision revision, String groupId, FunnelDTO funnelDTO) {
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ // ensure id is set
+ if (StringUtils.isBlank(funnelDTO.getId())) {
+ funnelDTO.setId(UUID.randomUUID().toString());
+ }
+
+ // add the label
+ final Funnel funnel = funnelDAO.createFunnel(groupId, funnelDTO);
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<FunnelDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createFunnelDto(funnel));
+
+ // save the flow
+ controllerFacade.save();
+
+ return response;
+ }
+
+ private void validateSnippetContents(final FlowSnippetDTO flowSnippet, final String groupId) {
+ // validate any processors
+ if (flowSnippet.getProcessors() != null) {
+ for (final ProcessorDTO processorDTO : flowSnippet.getProcessors()) {
+ final ProcessorNode processorNode = processorDAO.getProcessor(groupId, processorDTO.getId());
+ final Collection<ValidationResult> validationErrors = processorNode.getValidationErrors();
+ if (validationErrors != null && !validationErrors.isEmpty()) {
+ final List<String> errors = new ArrayList<>();
+ for (final ValidationResult validationResult : validationErrors) {
+ errors.add(validationResult.toString());
+ }
+ processorDTO.setValidationErrors(errors);
+ }
+ }
+ }
+
+ if (flowSnippet.getInputPorts() != null) {
+ for (final PortDTO portDTO : flowSnippet.getInputPorts()) {
+ final Port port = inputPortDAO.getPort(groupId, portDTO.getId());
+ final Collection<ValidationResult> validationErrors = port.getValidationErrors();
+ if (validationErrors != null && !validationErrors.isEmpty()) {
+ final List<String> errors = new ArrayList<>();
+ for (final ValidationResult validationResult : validationErrors) {
+ errors.add(validationResult.toString());
+ }
+ portDTO.setValidationErrors(errors);
+ }
+ }
+ }
+
+ if (flowSnippet.getOutputPorts() != null) {
+ for (final PortDTO portDTO : flowSnippet.getOutputPorts()) {
+ final Port port = outputPortDAO.getPort(groupId, portDTO.getId());
+ final Collection<ValidationResult> validationErrors = port.getValidationErrors();
+ if (validationErrors != null && !validationErrors.isEmpty()) {
+ final List<String> errors = new ArrayList<>();
+ for (final ValidationResult validationResult : validationErrors) {
+ errors.add(validationResult.toString());
+ }
+ portDTO.setValidationErrors(errors);
+ }
+ }
+ }
+
+ // get any remote process group issues
+ if (flowSnippet.getRemoteProcessGroups() != null) {
+ for (final RemoteProcessGroupDTO remoteProcessGroupDTO : flowSnippet.getRemoteProcessGroups()) {
+ final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(groupId, remoteProcessGroupDTO.getId());
+ if (remoteProcessGroup.getAuthorizationIssue() != null) {
+ remoteProcessGroupDTO.setAuthorizationIssues(Arrays.asList(remoteProcessGroup.getAuthorizationIssue()));
+ }
+ }
+ }
+ }
+
+ @Override
+ public ConfigurationSnapshot<FlowSnippetDTO> copySnippet(Revision revision, String groupId, String snippetId, Double originX, Double originY) {
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ // ensure id is set
+ if (StringUtils.isBlank(snippetId)) {
+ snippetId = UUID.randomUUID().toString();
+ }
+
+ // create the new snippet
+ FlowSnippetDTO flowSnippet = snippetDAO.copySnippet(groupId, snippetId, originX, originY);
+
+ // validate the new snippet
+ validateSnippetContents(flowSnippet, groupId);
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<FlowSnippetDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), flowSnippet);
+
+ // save the flow
+ controllerFacade.save();
+
+ return response;
+ }
+
+ @Override
+ public ConfigurationSnapshot<SnippetDTO> createSnippet(final Revision revision, final SnippetDTO snippetDTO) {
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ // ensure id is set
+ if (StringUtils.isBlank(snippetDTO.getId())) {
+ snippetDTO.setId(UUID.randomUUID().toString());
+ }
+
+ // add the snippet
+ final Snippet snippet = snippetDAO.createSnippet(snippetDTO);
+ final SnippetDTO responseSnippetDTO = dtoFactory.createSnippetDto(snippet);
+ responseSnippetDTO.setContents(snippetUtils.populateFlowSnippet(snippet, false));
+
+ // create the response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<SnippetDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), responseSnippetDTO);
+
+ return response;
+ }
+
+ @Override
+ public ConfigurationSnapshot<PortDTO> createInputPort(Revision revision, String groupId, PortDTO inputPortDTO) {
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ // ensure id is set
+ if (StringUtils.isBlank(inputPortDTO.getId())) {
+ inputPortDTO.setId(UUID.randomUUID().toString());
+ }
+
+ final Port inputPort = inputPortDAO.createPort(groupId, inputPortDTO);
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<PortDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createPortDto(inputPort));
+
+ // save the flow
+ controllerFacade.save();
+
+ return response;
+ }
+
+ @Override
+ public ConfigurationSnapshot<PortDTO> createOutputPort(Revision revision, String groupId, PortDTO outputPortDTO) {
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ // ensure id is set
+ if (StringUtils.isBlank(outputPortDTO.getId())) {
+ outputPortDTO.setId(UUID.randomUUID().toString());
+ }
+
+ final Port outputPort = outputPortDAO.createPort(groupId, outputPortDTO);
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<PortDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createPortDto(outputPort));
+
+ // save the flow
+ controllerFacade.save();
+
+ return response;
+ }
+
+ @Override
+ public ConfigurationSnapshot<ProcessGroupDTO> createProcessGroup(String parentGroupId, Revision revision, ProcessGroupDTO processGroupDTO) {
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ // ensure id is set
+ if (StringUtils.isBlank(processGroupDTO.getId())) {
+ processGroupDTO.setId(UUID.randomUUID().toString());
+ }
+
+ final ProcessGroup processGroup = processGroupDAO.createProcessGroup(parentGroupId, processGroupDTO);
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<ProcessGroupDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createProcessGroupDto(processGroup));
+
+ // save the flow
+ controllerFacade.save();
+
+ return response;
+ }
+
+ @Override
+ public ConfigurationSnapshot<RemoteProcessGroupDTO> createRemoteProcessGroup(Revision revision, String groupId, RemoteProcessGroupDTO remoteProcessGroupDTO) {
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ // ensure id is set
+ if (StringUtils.isBlank(remoteProcessGroupDTO.getId())) {
+ remoteProcessGroupDTO.setId(UUID.randomUUID().toString());
+ }
+
+ final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.createRemoteProcessGroup(groupId, remoteProcessGroupDTO);
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<RemoteProcessGroupDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup));
+
+ // save the flow
+ controllerFacade.save();
+
+ return response;
+ }
+
+ @Override
+ public TemplateDTO createTemplate(String name, String description, String snippetId) {
+ // get the specified snippet
+ Snippet snippet = snippetDAO.getSnippet(snippetId);
+
+ // create the template
+ TemplateDTO templateDTO = new TemplateDTO();
+ templateDTO.setName(name);
+ templateDTO.setDescription(description);
+ templateDTO.setTimestamp(new Date());
+ templateDTO.setSnippet(snippetUtils.populateFlowSnippet(snippet, true));
+
+ // set the id based on the specified seed
+ final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
+ if (clusterContext != null) {
+ templateDTO.setId(UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString());
+ }
+
+ // create the template
+ Template template = templateDAO.createTemplate(templateDTO);
+
+ return dtoFactory.createTemplateDTO(template);
+ }
+
+ @Override
+ public TemplateDTO importTemplate(TemplateDTO templateDTO) {
+ // ensure id is set
+ final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
+ if (clusterContext != null) {
+ templateDTO.setId(UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString());
+ }
+
+ // mark the timestamp
+ templateDTO.setTimestamp(new Date());
+
+ // import the template
+ final Template template = templateDAO.importTemplate(templateDTO);
+
+ // return the template dto
+ return dtoFactory.createTemplateDTO(template);
+ }
+
+ @Override
+ public ConfigurationSnapshot<FlowSnippetDTO> createTemplateInstance(Revision revision, String groupId, Double originX, Double originY, String templateId) {
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ // instantiate the template - there is no need to make another copy of the flow snippet since the actual template
+ // was copied and this dto is only used to instantiate it's components (which as already completed)
+ FlowSnippetDTO flowSnippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateId);
+
+ // validate the new snippet
+ validateSnippetContents(flowSnippet, groupId);
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<FlowSnippetDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), flowSnippet);
+
+ // save the flow
+ controllerFacade.save();
+
+ return response;
+ }
+
+ @Override
+ public ConfigurationSnapshot<Void> createArchive(Revision revision) {
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ // create the archive
+ controllerFacade.createArchive();
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<Void> response = new ConfigurationSnapshot<>(updatedRevision.getVersion());
+ return response;
+ }
+
+ @Override
+ public ConfigurationSnapshot<ProcessorDTO> setProcessorAnnotationData(Revision revision, String processorId, String annotationData) {
+ // ensure the proper revision before performing the update
+ checkRevision(revision);
+
+ // create the processor config
+ final ProcessorConfigDTO config = new ProcessorConfigDTO();
+ config.setAnnotationData(annotationData);
+
+ // create the processor dto
+ final ProcessorDTO processorDTO = new ProcessorDTO();
+ processorDTO.setId(processorId);
+ processorDTO.setConfig(config);
+
+ // get the parent group id for the specified processor
+ String groupId = controllerFacade.findProcessGroupIdForProcessor(processorId);
+
+ // ensure the parent group id was found
+ if (groupId == null) {
+ throw new ResourceNotFoundException(String.format("Unable to locate Processor with id '%s'.", processorId));
+ }
+
+ // update the processor configuration
+ ProcessorNode processor = processorDAO.updateProcessor(groupId, processorDTO);
+
+ // update the revision and generate a response
+ final Revision updatedRevision = updateRevision(revision);
+ final ConfigurationSnapshot<ProcessorDTO> response = new ConfigurationSnapshot<>(updatedRevision.getVersion(), dtoFactory.createProcessorDto(processor));
+
+ // save the flow
+ controllerFacade.save();
+
+ return response;
+ }
+
+ @Override
+ public void deleteActions(Date endDate) {
+ // get the user from the request
+ NiFiUser user = NiFiUserUtils.getNiFiUser();
+ if (user == null) {
+ throw new WebApplicationException(new Throwable("Unable to access details for current user."));
+ }
+
+ // create the purge details
+ PurgeDetails details = new PurgeDetails();
+ details.setEndDate(endDate);
+
+ // create a purge action to record that records are being removed
+ Action purgeAction = new Action();
+ purgeAction.setUserDn(user.getDn());
+ purgeAction.setUserName(user.getUserName());
+ purgeAction.setOperation(Operation.Purge);
+ purgeAction.setTimestamp(new Date());
+ purgeAction.setSourceId("Flow Controller");
+ purgeAction.setSourceName("History");
+ purgeAction.setSourceType(Component.Controller);
+ purgeAction.setActionDetails(details);
+
+ // purge corresponding actions
+ auditService.purgeActions(endDate, purgeAction);
+ }
+
+ @Override
+ public void invalidateUser(String userId) {
+ try {
+ userService.invalidateUserAccount(userId);
+ } catch (final AccountNotFoundException anfe) {
+ // ignore
+ }
+ }
+
+ @Override
+ public void invalidateUserGroup(String userGroup, Set<String> userIds) {
+ // invalidates any user currently associated with this group
+ if (userGroup != null) {
+ userService.invalidateUserGroupAccount(userGroup);
+ }
+
+ // invalidates any user that will be associated with this group
+ if (userIds != null) {
+ for (final String userId : userIds) {
+ invalidateUser(userId);
+ }
+ }
+ }
+
+ @Override
+ public UserDTO updateUser(UserDTO userDto) {
+ NiFiUser user;
+
+ // attempt to parse the user id
+ final String id = userDto.getId();
+
+ // determine the authorities that have been specified in the request
+ Set<Authority> authorities = null;
+ if (userDto.getAuthorities() != null) {
+ authorities = Authority.convertRawAuthorities(userDto.getAuthorities());
+ }
+
+ // if the account status isn't specified or isn't changing
+ final AccountStatus accountStatus = AccountStatus.valueOfStatus(userDto.getStatus());
+ if (accountStatus == null || AccountStatus.ACTIVE.equals(accountStatus)) {
+ // ensure that authorities have been specified (may be empty, but not null)
+ if (authorities == null) {
+ throw new IllegalArgumentException("Authorities must be specified when updating an account.");
+ }
+
+ // update the user account
+ user = userService.update(id, authorities);
+ } else if (AccountStatus.DISABLED.equals(accountStatus)) {
+ // disable the account
+ user = userService.disable(id);
+ } else {
+ throw new IllegalArgumentException("Accounts cannot be marked pending.");
+ }
+
+ return dtoFactory.createUserDTO(user);
+ }
+
+ @Override
+ public void deleteUser(String userId) {
+ userService.deleteUser(userId);
+ }
+
+ @Override
+ public UserGroupDTO updateUserGroup(final UserGroupDTO userGroupDTO) {
+ NiFiUserGroup userGroup;
+
+ // convert the authorities
+ Set<Authority> authorities = null;
+ if (userGroupDTO.getAuthorities() != null) {
+ authorities = Authority.convertRawAuthorities(userGroupDTO.getAuthorities());
+ }
+
+ final AccountStatus accountStatus = AccountStatus.valueOfStatus(userGroupDTO.getStatus());
+ if (accountStatus == null || AccountStatus.ACTIVE.equals(accountStatus)) {
+ // update the user group
+ userGroup = userService.updateGroup(userGroupDTO.getGroup(), userGroupDTO.getUserIds(), authorities);
+ } else if (AccountStatus.DISABLED.equals(accountStatus)) {
+ // disable the accounts
+ userGroup = userService.disableGroup(userGroupDTO.getGroup());
+ } else {
+ throw new IllegalArgumentException("Accounts cannot be marked pending.");
+ }
+
+ // generate the user group dto
+ return dtoFactory.createUserGroupDTO(userGroup);
+ }
+
+ @Override
+ public void removeUserFromGroup(String userId) {
+ userService.ungroupUser(userId);
+ }
+
+ @Override
+ public void removeUserGroup(String userGroup) {
+ userService.ungroup(userGroup);
+ }
+
+ @Override
+ public ProvenanceDTO submitProvenance(ProvenanceDTO query) {
+ return controllerFacade.submitProvenance(query);
+ }
+
+ @Override
+ public void deleteProvenance(String queryId) {
+ controllerFacade.deleteProvenanceQuery(queryId);
+ }
+
+ @Override
+ public LineageDTO submitLineage(LineageDTO lineage) {
+ return controllerFacade.submitLineage(lineage);
+ }
+
+ @Override
+ public void deleteLineage(String lineageId) {
+ controllerFacade.deleteLineage(lineageId);
+ }
+
+ @Override
+ public ProvenanceEventDTO submitReplay(Long eventId) {
+ return controllerFacade.submitReplay(eventId);
+ }
+
+ // -----------------------------------------
+ // Read Operations
+ // -----------------------------------------
+ @Override
+ public RevisionDTO getRevision() {
+ return dtoFactory.createRevisionDTO(optimisticLockingManager.getRevision());
+ }
+
+ @Override
+ public SearchResultsDTO searchController(String query) {
+ return controllerFacade.search(query);
+ }
+
+ @Override
+ public DownloadableContent getContent(Long eventId, String uri, ContentDirection contentDirection) {
+ return controllerFacade.getContent(eventId, uri, contentDirection);
+ }
+
+ @Override
+ public ProvenanceDTO getProvenance(String queryId) {
+ return controllerFacade.getProvenanceQuery(queryId);
+ }
+
+ @Override
+ public LineageDTO getLineage(String lineageId) {
+ return controllerFacade.getLineage(lineageId);
+ }
+
+ @Override
+ public ProvenanceOptionsDTO getProvenanceSearchOptions() {
+ return controllerFacade.getProvenanceSearchOptions();
+ }
+
+ @Override
+ public ProvenanceEventDTO getProvenanceEvent(final Long id) {
+ return controllerFacade.getProvenanceEvent(id);
+ }
+
+ @Override
+ public ProcessGroupStatusDTO getProcessGroupStatus(String groupId) {
+ ProcessGroupStatusDTO statusReport;
+ if (properties.isClusterManager()) {
+ final ProcessGroupStatus mergedProcessGroupStatus = clusterManager.getProcessGroupStatus(groupId);
+ if (mergedProcessGroupStatus == null) {
+ throw new ResourceNotFoundException(String.format("Unable to find status for process group %s.", groupId));
+ }
+ statusReport = dtoFactory.createProcessGroupStatusDto(clusterManager.getBulletinRepository(), mergedProcessGroupStatus);
+ } else {
+ statusReport = controllerFacade.getProcessGroupStatus(groupId);
+ }
+ return statusReport;
+ }
+
+ @Override
+ public ControllerStatusDTO getControllerStatus() {
+ final ControllerStatusDTO controllerStatus;
+
+ if (properties.isClusterManager()) {
+ final Set<Node> connectedNodes = clusterManager.getNodes(Node.Status.CONNECTED);
+
+ if (connectedNodes.isEmpty()) {
+ throw new NoConnectedNodesException();
+ }
+
+ int activeThreadCount = 0;
+ long totalFlowFileObjectCount = 0;
+ long totalFlowFileByteCount = 0;
+ for (final Node node : connectedNodes) {
+ final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
+ if (nodeHeartbeatPayload == null) {
+ continue;
+ }
+
+ activeThreadCount += nodeHeartbeatPayload.getActiveThreadCount();
+ totalFlowFileObjectCount += nodeHeartbeatPayload.getTotalFlowFileCount();
+ totalFlowFileByteCount += nodeHeartbeatPayload.getTotalFlowFileBytes();
+ }
+
+ controllerStatus = new ControllerStatusDTO();
+ controllerStatus.setActiveThreadCount(activeThreadCount);
+ controllerStatus.setQueued(FormatUtils.formatCount(totalFlowFileObjectCount) + " / " + FormatUtils.formatDataSize(totalFlowFileByteCount));
+
+ final int numNodes = clusterManager.getNodeIds().size();
+ controllerStatus.setConnectedNodes(connectedNodes.size() + " / " + numNodes);
+
+ // get the bulletins for the controller
+ final BulletinRepository bulletinRepository = clusterManager.getBulletinRepository();
+ final List<Bulletin> results = bulletinRepository.findBulletinsForController();
+ final List<BulletinDTO> bulletinDtos = new ArrayList<>(results.size());
+ for (final Bulletin bulletin : results) {
+ bulletinDtos.add(dtoFactory.createBulletinDto(bulletin));
+ }
+ controllerStatus.setBulletins(bulletinDtos);
+
+ // get the component counts by extracting them from the roots' group status
+ final ProcessGroupStatus status = clusterManager.getProcessGroupStatus("root");
+ if (status != null) {
+ final ProcessGroupCounts counts = extractProcessGroupCounts(status);
+ controllerStatus.setRunningCount(counts.getRunningCount());
+ controllerStatus.setStoppedCount(counts.getStoppedCount());
+ controllerStatus.setInvalidCount(counts.getInvalidCount());
+ controllerStatus.setDisabledCount(counts.getDisabledCount());
+ controllerStatus.setActiveRemotePortCount(counts.getActiveRemotePortCount());
+ controllerStatus.setInactiveRemotePortCount(counts.getInactiveRemotePortCount());
+ }
+ } else {
+ // get the controller status
+ controllerStatus = controllerFacade.getControllerStatus();
+ }
+
+ // determine if there are any pending user accounts - only include if appropriate
+ if (NiFiUserUtils.getAuthorities().contains(Authority.ROLE_ADMIN.toString())) {
+ controllerStatus.setHasPendingAccounts(userService.hasPendingUserAccount());
+ }
+
+ return controllerStatus;
+ }
+
+ @Override
+ public CountersDTO getCounters() {
+ if (properties.isClusterManager()) {
+ final Map<String, CounterDTO> mergedCountersMap = new HashMap<>();
+ final Set<Node> connectedNodes = clusterManager.getNodes(Node.Status.CONNECTED);
+
+ if (connectedNodes.isEmpty()) {
+ throw new NoConnectedNodesException();
+ }
+
+ for (final Node node : connectedNodes) {
+ final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
+ if (nodeHeartbeatPayload == null) {
+ continue;
+ }
+ final List<Counter> nodeCounters = node.getHeartbeatPayload().getCounters();
+ if (nodeCounters == null) {
+ continue;
+ }
+
+ // for each node, add its counter values to the aggregate values
+ for (final Counter nodeCounter : nodeCounters) {
+ final CounterDTO mergedCounter = mergedCountersMap.get(nodeCounter.getIdentifier());
+
+ // either create a new aggregate counter or update the aggregate counter
+ if (mergedCounter == null) {
+ // add new counter
+ mergedCountersMap.put(nodeCounter.getIdentifier(), dtoFactory.createCounterDto(nodeCounter));
+ } else {
+ // update aggregate counter
+ mergedCounter.setValueCount(mergedCounter.getValueCount() + nodeCounter.getValue());
+ mergedCounter.setValue(FormatUtils.formatCount(mergedCounter.getValueCount()));
+ }
+ }
+ }
+
+ final CountersDTO mergedCounters = new CountersDTO();
+ mergedCounters.setGenerated(new Date());
+ mergedCounters.setCounters(mergedCountersMap.values());
+ return mergedCounters;
+ } else {
+ List<Counter> counters = controllerFacade.getCounters();
+ Set<CounterDTO> counterDTOs = new LinkedHashSet<>(counters.size());
+ for (Counter counter : counters) {
+ counterDTOs.add(dtoFactory.createCounterDto(counter));
+ }
+ return dtoFactory.createCountersDto(counterDTOs);
+ }
+
+ }
+
+ @Override
+ public Set<ConnectionDTO> getConnections(String groupId) {
+ Set<ConnectionDTO> connectionDtos = new LinkedHashSet<>();
+ for (Connection connection : connectionDAO.getConnections(groupId)) {
+ connectionDtos.add(dtoFactory.createConnectionDto(connection));
+ }
+ return connectionDtos;
+ }
+
+ @Override
+ public ConnectionDTO getConnection(String groupId, String connectionId) {
+ return dtoFactory.createConnectionDto(connectionDAO.getConnection(groupId, connectionId));
+ }
+
+ @Override
+ public StatusHistoryDTO getConnectionStatusHistory(String groupId, String connectionId) {
+ return controllerFacade.getConnectionStatusHistory(groupId, connectionId);
+ }
+
+ @Override
+ public Set<ProcessorDTO> getProcessors(String groupId) {
+ Set<ProcessorDTO> processorDtos = new LinkedHashSet<>();
+ for (ProcessorNode processor : processorDAO.getProcessors(groupId)) {
+ processorDtos.add(dtoFactory.createProcessorDto(processor));
+ }
+ return processorDtos;
+ }
+
+ @Override
+ public TemplateDTO exportTemplate(String id) {
+ Template template = templateDAO.getTemplate(id);
+ TemplateDTO templateDetails = template.getDetails();
+
+ TemplateDTO templateDTO = dtoFactory.createTemplateDTO(template);
+ templateDTO.setSnippet(dtoFactory.copySnippetContents(templateDetails.getSnippet()));
+ return templateDTO;
+ }
+
+ @Override
+ public TemplateDTO getTemplate(String id) {
+ return dtoFactory.createTemplateDTO(templateDAO.getTemplate(id));
+ }
+
+ @Override
+ public Set<TemplateDTO> getTemplates() {
+ Set<TemplateDTO> templateDtos = new LinkedHashSet<>();
+ for (Template template : templateDAO.getTemplates()) {
+ templateDtos.add(dtoFactory.createTemplateDTO(template));
+ }
+ return templateDtos;
+ }
+
+ @Override
+ public Set<DocumentedTypeDTO> getWorkQueuePrioritizerTypes() {
+ return controllerFacade.getFlowFileComparatorTypes();
+ }
+
+ @Override
+ public Set<DocumentedTypeDTO> getProcessorTypes() {
+ return controllerFacade.getFlowFileProcessorTypes();
+ }
+
+ @Override
++ public Set<DocumentedTypeDTO> getControllerServiceTypes() {
++ return controllerFacade.getControllerServiceTypes();
++ }
++
++ @Override
++ public Set<DocumentedTypeDTO> getReportingTaskTypes() {
++ return controllerFacade.getReportingTaskTypes();
++ }
++
++ @Override
+ public ProcessorDTO getProcessor(String groupId, String id) {
+ final ProcessorNode processor = processorDAO.getProcessor(groupId, id);
+ final ProcessorDTO processorDto = dtoFactory.createProcessorDto(processor);
+ return processorDto;
+ }
+
+ @Override
+ public StatusHistoryDTO getProcessorStatusHistory(String groupId, String id) {
+ return controllerFacade.getProcessorStatusHistory(groupId, id);
+ }
+
+ @Override
+ public BulletinBoardDTO getBulletinBoard(BulletinQueryDTO query) {
+ // build the query
+ final BulletinQuery.Builder queryBuilder = new BulletinQuery.Builder()
+ .groupIdMatches(query.getGroupId())
+ .sourceIdMatches(query.getSourceId())
+ .nameMatches(query.getName())
+ .messageMatches(query.getMessage())
+ .after(query.getAfter())
+ .limit(query.getLimit());
+
+ // get the bulletin repository
+ final BulletinRepository bulletinRepository;
+ if (properties.isClusterManager()) {
+ bulletinRepository = clusterManager.getBulletinRepository();
+ } else {
+ bulletinRepository = controllerFacade.getBulletinRepository();
+ }
+
+ // perform the query
+ final List<Bulletin> results = bulletinRepository.findBulletins(queryBuilder.build());
+
+ // perform the query and generate the results - iterating in reverse order since we are
+ // getting the most recent results by ordering by timestamp desc above. this gets the
+ // exact results we want but in reverse order
+ final List<BulletinDTO> bulletins = new ArrayList<>();
+ for (final ListIterator<Bulletin> bulletinIter = results.listIterator(results.size()); bulletinIter.hasPrevious();) {
+ bulletins.add(dtoFactory.createBulletinDto(bulletinIter.previous()));
+ }
+
+ // create the bulletin board
+ BulletinBoardDTO bulletinBoard = new BulletinBoardDTO();
+ bulletinBoard.setBulletins(bulletins);
+ bulletinBoard.setGenerated(new Date());
+ return bulletinBoard;
+ }
+
+ @Override
+ public SystemDiagnosticsDTO getSystemDiagnostics() {
+ final SystemDiagnosticsDTO dto;
+ if (properties.isClusterManager()) {
+ final SystemDiagnostics clusterDiagnostics = clusterManager.getSystemDiagnostics();
+ if (clusterDiagnostics == null) {
+ throw new IllegalStateException("Nodes are connected but no systems diagnostics have been reported.");
+ }
+ dto = dtoFactory.createSystemDiagnosticsDto(clusterDiagnostics);
+ } else {
+ final SystemDiagnostics sysDiagnostics = controllerFacade.getSystemDiagnostics();
+ dto = dtoFactory.createSystemDiagnosticsDto(sysDiagnostics);
+ }
+ return dto;
+ }
+
+ /**
+ * Ensures the specified user has permission to access the specified port.
+ *
+ * @param user
+ * @param port
+ * @return
+ */
+ private boolean isUserAuthorized(final NiFiUser user, final RootGroupPort port) {
+ final boolean isSiteToSiteSecure = Boolean.TRUE.equals(properties.isSiteToSiteSecure());
+
+ // if site to site is not secure, allow all users
+ if (!isSiteToSiteSecure) {
+ return true;
+ }
+
+ final Set<String> allowedUsers = port.getUserAccessControl();
+ if (allowedUsers.contains(user.getDn())) {
+ return true;
+ }
+
+ final String userGroup = user.getUserGroup();
+ if (userGroup == null) {
+ return false;
+ }
+
+ final Set<String> allowedGroups = port.getGroupAccessControl();
+ return allowedGroups.contains(userGroup);
+ }
+
+ @Override
+ public ControllerDTO getController() {
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ if (user == null) {
+ throw new WebApplicationException(new Throwable("Unable to access details for current user."));
+ }
+
+ // at this point we know that the user must have ROLE_NIFI because it's required
+ // get to the endpoint that calls this method but we'll check again anyways
+ final Set<Authority> authorities = user.getAuthorities();
+ if (!authorities.contains(Authority.ROLE_NIFI)) {
+ throw new AccessDeniedException("User must have the NiFi role in order to access these details.");
+ }
+
+ // serialize the input ports this NiFi has access to
+ final Set<PortDTO> inputPorts = new LinkedHashSet<>();
+ for (RootGroupPort inputPort : controllerFacade.getInputPorts()) {
+ if (isUserAuthorized(user, inputPort)) {
+ final PortDTO dto = new PortDTO();
+ dto.setId(inputPort.getIdentifier());
+ dto.setName(inputPort.getName());
+ dto.setComments(inputPort.getComments());
+ dto.setState(inputPort.getScheduledState().toString());
+ inputPorts.add(dto);
+ }
+ }
+
+ // serialize the output ports this NiFi has access to
+ final Set<PortDTO> outputPorts = new LinkedHashSet<>();
+ for (RootGroupPort outputPort : controllerFacade.getOutputPorts()) {
+ if (isUserAuthorized(user, outputPort)) {
+ final PortDTO dto = new PortDTO();
+ dto.setId(outputPort.getIdentifier());
+ dto.setName(outputPort.getName());
+ dto.setComments(outputPort.getComments());
+ dto.setState(outputPort.getScheduledState().toString());
+ outputPorts.add(dto);
+ }
+ }
+
+ // get the root group
+ final ProcessGroup rootGroup = processGroupDAO.getProcessGroup(controllerFacade.getRootGroupId());
+ final ProcessGroupCounts counts = rootGroup.getCounts();
+
+ // create the controller dto
+ final ControllerDTO controllerDTO = new ControllerDTO();
+ controllerDTO.setId(controllerFacade.getRootGroupId());
+ controllerDTO.setInstanceId(controllerFacade.getInstanceId());
+ controllerDTO.setName(controllerFacade.getName());
+ controllerDTO.setComments(controllerFacade.getComments());
+ controllerDTO.setInputPorts(inputPorts);
+ controllerDTO.setOutputPorts(outputPorts);
+ controllerDTO.setInputPortCount(inputPorts.size());
+ controllerDTO.setOutputPortCount(outputPorts.size());
+ controllerDTO.setRunningCount(counts.getRunningCount());
+ controllerDTO.setStoppedCount(counts.getStoppedCount());
+ controllerDTO.setInvalidCount(counts.getInvalidCount());
+ controllerDTO.setDisabledCount(counts.getDisabledCount());
+
+ // determine the site to site configuration
+ if (isClustered()) {
+ controllerDTO.setRemoteSiteListeningPort(controllerFacade.getClusterManagerRemoteSiteListeningPort());
+ controllerDTO.setSiteToSiteSecure(controllerFacade.isClusterManagerRemoteSiteCommsSecure());
+ } else {
+ controllerDTO.setRemoteSiteListeningPort(controllerFacade.getRemoteSiteListeningPort());
+ controllerDTO.setSiteToSiteSecure(controllerFacade.isRemoteSiteCommsSecure());
+ }
+
+ return controllerDTO;
+ }
+
+ @Override
+ public String getInstanceId() {
+ return controllerFacade.getInstanceId();
+ }
+
+ @Override
+ public ControllerConfigurationDTO getControllerConfiguration() {
+ ControllerConfigurationDTO controllerConfig = new ControllerConfigurationDTO();
+ controllerConfig.setName(controllerFacade.getName());
+ controllerConfig.setComments(controllerFacade.getComments());
+ controllerConfig.setMaxTimerDrivenThreadCount(controllerFacade.getMaxTimerDrivenThreadCount());
+ controllerConfig.setMaxEventDrivenThreadCount(controllerFacade.getMaxEventDrivenThreadCount());
+
+ // get the refresh interval
+ final long refreshInterval = FormatUtils.getTimeDuration(properties.getAutoRefreshInterval(), TimeUnit.SECONDS);
+ controllerConfig.setAutoRefreshIntervalSeconds(refreshInterval);
+
+ // get the content viewer url
+ controllerConfig.setContentViewerUrl(properties.getProperty(NiFiProperties.CONTENT_VIEWER_URL));
+
+ final Date now = new Date();
+ controllerConfig.setTimeOffset(TimeZone.getDefault().getOffset(now.getTime()));
+
+ // determine the site to site configuration
+ if (isClustered()) {
+ controllerConfig.setSiteToSiteSecure(controllerFacade.isClusterManagerRemoteSiteCommsSecure());
+ } else {
+ controllerConfig.setSiteToSiteSecure(controllerFacade.isRemoteSiteCommsSecure());
+ }
+
+ return controllerConfig;
+ }
+
+ @Override
+ public Set<LabelDTO> getLabels(String groupId) {
+ Set<LabelDTO> labelDtos = new LinkedHashSet<>();
+ for (Label label : labelDAO.getLabels(groupId)) {
+ labelDtos.add(dtoFactory.createLabelDto(label));
+ }
+ return labelDtos;
+ }
+
+ @Override
+ public LabelDTO getLabel(String groupId, String labelId) {
+ return dtoFactory.createLabelDto(labelDAO.getLabel(groupId, labelId));
+ }
+
+ @Override
+ public Set<FunnelDTO> getFunnels(String groupId) {
+ Set<FunnelDTO> funnelDtos = new LinkedHashSet<>();
+ for (Funnel funnel : funnelDAO.getFunnels(groupId)) {
+ funnelDtos.add(dtoFactory.createFunnelDto(funnel));
+ }
+ return funnelDtos;
+ }
+
+ @Override
+ public FunnelDTO getFunnel(String groupId, String funnelId) {
+ return dtoFactory.createFunnelDto(funnelDAO.getFunnel(groupId, funnelId));
+ }
+
+ @Override
+ public SnippetDTO getSnippet(String snippetId) {
+ final Snippet snippet = snippetDAO.getSnippet(snippetId);
+ final SnippetDTO snippetDTO = dtoFactory.createSnippetDto(snippet);
+ snippetDTO.setContents(snippetUtils.populateFlowSnippet(snippet, false));
+ return snippetDTO;
+ }
+
+ @Override
+ public Set<PortDTO> getInputPorts(String groupId) {
+ Set<PortDTO> portDtos = new LinkedHashSet<>();
+ for (Port port : inputPortDAO.getPorts(groupId)) {
+ portDtos.add(dtoFactory.createPortDto(port));
+ }
+ return portDtos;
+ }
+
+ @Override
+ public Set<PortDTO> getOutputPorts(String groupId) {
+ Set<PortDTO> portDtos = new LinkedHashSet<>();
+ for (Port port : outputPortDAO.getPorts(groupId)) {
+ portDtos.add(dtoFactory.createPortDto(port));
+ }
+ return portDtos;
+ }
+
+ @Override
+ public Set<ProcessGroupDTO> getProcessGroups(String parentGroupId) {
+ Set<ProcessGroupDTO> processGroupDtos = new LinkedHashSet<>();
+ for (ProcessGroup groups : processGroupDAO.getProcessGroups(parentGroupId)) {
+ processGroupDtos.add(dtoFactory.createProcessGroupDto(groups));
+ }
+ return processGroupDtos;
+ }
+
+ @Override
+ public Set<RemoteProcessGroupDTO> getRemoteProcessGroups(String groupId) {
+ Set<RemoteProcessGroupDTO> remoteProcessGroupDtos = new LinkedHashSet<>();
+ for (RemoteProcessGroup remoteProcessGroup : remoteProcessGroupDAO.getRemoteProcessGroups(groupId)) {
+ remoteProcessGroupDtos.add(dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup));
+ }
+ return remoteProcessGroupDtos;
+ }
+
+ @Override
+ public PortDTO getInputPort(String groupId, String inputPortId) {
+ return dtoFactory.createPortDto(inputPortDAO.getPort(groupId, inputPortId));
+ }
+
+ @Override
+ public PortDTO getOutputPort(String groupId, String outputPortId) {
+ return dtoFactory.createPortDto(outputPortDAO.getPort(groupId, outputPortId));
+ }
+
+ @Override
+ public RemoteProcessGroupDTO getRemoteProcessGroup(String groupId, String remoteProcessGroupId) {
+ return dtoFactory.createRemoteProcessGroupDto(remoteProcessGroupDAO.getRemoteProcessGroup(groupId, remoteProcessGroupId));
+ }
+
+ @Override
+ public StatusHistoryDTO getRemoteProcessGroupStatusHistory(String groupId, String id) {
+ return controllerFacade.getRemoteProcessGroupStatusHistory(groupId, id);
+ }
+
+ @Override
+ public ConfigurationSnapshot<ProcessGroupDTO> getProcessGroup(String groupId, final boolean recurse) {
+ ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
+ Long version = optimisticLockingManager.getRevision().getVersion();
+ ConfigurationSnapshot<ProcessGroupDTO> response = new ConfigurationSnapshot<>(version, dtoFactory.createProcessGroupDto(processGroup, recurse));
+ return response;
+ }
+
+ @Override
+ public StatusHistoryDTO getProcessGroupStatusHistory(String groupId) {
+ return controllerFacade.getProcessGroupStatusHistory(groupId);
+ }
+
+ @Override
+ public HistoryDTO getActions(HistoryQueryDTO historyQueryDto) {
+ // extract the query criteria
+ HistoryQuery historyQuery = new HistoryQuery();
+ historyQuery.setStartDate(historyQueryDto.getStartDate());
+ historyQuery.setEndDate(historyQueryDto.getEndDate());
+ historyQuery.setSourceId(historyQueryDto.getSourceId());
+ historyQuery.setUserName(historyQueryDto.getUserName());
+ historyQuery.setOffset(historyQueryDto.getOffset());
+ historyQuery.setCount(historyQueryDto.getCount());
+ historyQuery.setSortColumn(historyQueryDto.getSortColumn());
+ historyQuery.setSortOrder(historyQueryDto.getSortOrder());
+
+ // perform the query
+ History history = auditService.getActions(historyQuery);
+
+ // create the response
+ return dtoFactory.createHistoryDto(history);
+ }
+
+ @Override
+ public ActionDTO getAction(Integer actionId) {
+ // get the action
+ Action action = auditService.getAction(actionId);
+
+ // ensure the action was found
+ if (action == null) {
+ throw new ResourceNotFoundException(String.format("Unable to find action with id '%s'.", actionId));
+ }
+
+ // return the action
+ return dtoFactory.createActionDto(action);
+ }
+
+ @Override
+ public ProcessorHistoryDTO getProcessorHistory(String processorId) {
+ final Map<String, PropertyHistoryDTO> propertyHistoryDtos = new LinkedHashMap<>();
+ final Map<String, List<PreviousValue>> propertyHistory = auditService.getPreviousValues(processorId);
+
+ for (final Map.Entry<String, List<PreviousValue>> entry : propertyHistory.entrySet()) {
+ final List<PreviousValueDTO> previousValueDtos = new ArrayList<>();
+
+ for (final PreviousValue previousValue : entry.getValue()) {
+ final PreviousValueDTO dto = new PreviousValueDTO();
+ dto.setPreviousValue(previousValue.getPreviousValue());
+ dto.setTimestamp(previousValue.getTimestamp());
+ dto.setUserName(previousValue.getUserName());
+ previousValueDtos.add(dto);
+ }
+
+ if (!previousValueDtos.isEmpty()) {
+ final PropertyHistoryDTO propertyHistoryDto = new PropertyHistoryDTO();
+ propertyHistoryDto.setPreviousValues(previousValueDtos);
+ propertyHistoryDtos.put(entry.getKey(), propertyHistoryDto);
+ }
+ }
+
+ final ProcessorHistoryDTO history = new ProcessorHistoryDTO();
+ history.setProcessorId(processorId);
+ history.setPropertyHistory(propertyHistoryDtos);
+
+ return history;
+ }
+
+ @Override
+ public UserDTO getUser(String userId) {
+ // get the user
+ NiFiUser user = userService.getUserById(userId);
+
+ // ensure the user was found
+ if (user == null) {
+ throw new ResourceNotFoundException(String.format("Unable to find user with id '%s'.", userId));
+ }
+
+ return dtoFactory.createUserDTO(user);
+ }
+
+ @Override
+ public Collection<UserDTO> getUsers(Boolean grouped) {
+ // get the users
+ final Collection<NiFiUser> users = userService.getUsers();
+ final Collection<UserDTO> userDTOs = new HashSet<>();
+
+ if (grouped) {
+ final Map<String, UserDTO> groupedUserDTOs = new HashMap<>();
+
+ // group the users
+ for (final NiFiUser user : users) {
+ if (StringUtils.isNotBlank(user.getUserGroup())) {
+ if (groupedUserDTOs.containsKey(user.getUserGroup())) {
+ final UserDTO groupedUser = groupedUserDTOs.get(user.getUserGroup());
+ groupedUser.setId(groupedUser.getId() + "," + String.valueOf(user.getId()));
+ groupedUser.setUserName(groupedUser.getUserName() + ", " + user.getUserName());
+
<TRUNCATED>