You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/01/21 14:05:09 UTC

[57/64] incubator-nifi git commit: Merge branch 'develop' into NIFI-250

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c5d452c1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index 0000000,dbc4b3c..aabf145
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@@ -1,0 -1,1392 +1,1414 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.web.controller;
+ 
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.text.Collator;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.Comparator;
+ import java.util.Date;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Locale;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.SortedSet;
+ import java.util.TimeZone;
+ import java.util.TreeSet;
+ import java.util.concurrent.TimeUnit;
+ 
+ import javax.ws.rs.WebApplicationException;
+ 
+ import org.apache.nifi.cluster.protocol.NodeIdentifier;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.connectable.Connectable;
+ import org.apache.nifi.connectable.Connection;
+ import org.apache.nifi.connectable.Funnel;
+ import org.apache.nifi.connectable.Port;
+ import org.apache.nifi.controller.ContentAvailability;
+ import org.apache.nifi.controller.ControllerService;
+ import org.apache.nifi.controller.Counter;
+ import org.apache.nifi.controller.FlowController;
+ import org.apache.nifi.controller.FlowFileQueue;
+ import org.apache.nifi.controller.ProcessorNode;
+ import org.apache.nifi.controller.ScheduledState;
+ import org.apache.nifi.controller.repository.ContentNotFoundException;
+ import org.apache.nifi.controller.repository.claim.ContentDirection;
+ import org.apache.nifi.controller.service.ControllerServiceNode;
+ import org.apache.nifi.controller.service.ControllerServiceProvider;
+ import org.apache.nifi.controller.status.ProcessGroupStatus;
+ import org.apache.nifi.diagnostics.SystemDiagnostics;
+ import org.apache.nifi.flowfile.FlowFilePrioritizer;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.groups.ProcessGroup;
+ import org.apache.nifi.groups.ProcessGroupCounts;
+ import org.apache.nifi.groups.RemoteProcessGroup;
+ import org.apache.nifi.nar.ExtensionManager;
+ import org.apache.nifi.nar.NarCloseable;
+ import org.apache.nifi.processor.Processor;
+ import org.apache.nifi.processor.QueueSize;
+ import org.apache.nifi.processor.Relationship;
+ import org.apache.nifi.provenance.ProvenanceEventRecord;
+ import org.apache.nifi.provenance.ProvenanceEventRepository;
+ import org.apache.nifi.provenance.SearchableFields;
+ import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+ import org.apache.nifi.provenance.search.Query;
+ import org.apache.nifi.provenance.search.QueryResult;
+ import org.apache.nifi.provenance.search.QuerySubmission;
+ import org.apache.nifi.provenance.search.SearchTerm;
+ import org.apache.nifi.provenance.search.SearchTerms;
+ import org.apache.nifi.provenance.search.SearchableField;
+ import org.apache.nifi.remote.RootGroupPort;
+ import org.apache.nifi.reporting.Bulletin;
+ import org.apache.nifi.reporting.BulletinRepository;
+ import org.apache.nifi.scheduling.SchedulingStrategy;
+ import org.apache.nifi.search.SearchContext;
+ import org.apache.nifi.search.SearchResult;
+ import org.apache.nifi.search.Searchable;
+ import org.apache.nifi.web.security.user.NiFiUserUtils;
+ import org.apache.nifi.services.FlowService;
+ import org.apache.nifi.user.NiFiUser;
+ import org.apache.nifi.util.FormatUtils;
+ import org.apache.nifi.util.NiFiProperties;
+ import org.apache.nifi.web.NiFiCoreException;
+ import org.apache.nifi.web.ResourceNotFoundException;
+ import org.apache.nifi.web.api.dto.BulletinDTO;
+ import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
+ import org.apache.nifi.web.api.dto.DtoFactory;
+ import org.apache.nifi.web.api.dto.provenance.AttributeDTO;
+ import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
+ import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
+ import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO;
+ import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO;
+ import org.apache.nifi.web.api.dto.provenance.ProvenanceResultsDTO;
+ import org.apache.nifi.web.api.dto.provenance.ProvenanceSearchableFieldDTO;
+ import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO;
+ import org.apache.nifi.web.api.dto.provenance.lineage.LineageRequestDTO;
+ import org.apache.nifi.web.api.dto.provenance.lineage.LineageRequestDTO.LineageRequestType;
+ import org.apache.nifi.web.api.dto.search.ComponentSearchResultDTO;
+ import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
+ import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
+ import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
+ import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
+ import org.apache.nifi.web.util.DownloadableContent;
+ import org.apache.commons.collections4.CollectionUtils;
+ import org.apache.commons.lang3.StringUtils;
+ import org.apache.nifi.admin.service.UserService;
+ import org.apache.nifi.authorization.DownloadAuthorization;
+ import org.apache.nifi.processor.DataUnit;
++import org.apache.nifi.reporting.ReportingTask;
++
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import org.springframework.security.access.AccessDeniedException;
+ 
+ /**
+  *
+  */
+ public class ControllerFacade implements ControllerServiceProvider {
+ 
+     private static final Logger logger = LoggerFactory.getLogger(ControllerFacade.class);
+ 
+     // nifi components
+     private FlowController flowController;
+     private FlowService flowService;
+     private UserService userService;
+ 
+     // properties
+     private NiFiProperties properties;
+     private DtoFactory dtoFactory;
+ 
+     /**
+      * Creates an archive of the current flow.
+      */
+     public void createArchive() {
+         flowService.saveFlowChanges(TimeUnit.SECONDS, 0, true);
+     }
+ 
+     /**
+      * Returns the group id that contains the specified processor.
+      *
+      * @param processorId
+      * @return
+      */
+     public String findProcessGroupIdForProcessor(String processorId) {
+         final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
+         final ProcessorNode processor = rootGroup.findProcessor(processorId);
+         if (processor == null) {
+             return null;
+         } else {
+             return processor.getProcessGroup().getIdentifier();
+         }
+     }
+ 
+     /**
+      * Sets the name of this controller.
+      *
+      * @param name
+      */
+     public void setName(String name) {
+         flowController.setName(name);
+     }
+ 
+     /**
+      * Sets the comments of this controller.
+      *
+      * @param comments
+      */
+     public void setComments(String comments) {
+         flowController.setComments(comments);
+     }
+ 
+     /**
+      * Sets the max timer driven thread count of this controller.
+      *
+      * @param maxTimerDrivenThreadCount
+      */
+     public void setMaxTimerDrivenThreadCount(int maxTimerDrivenThreadCount) {
+         flowController.setMaxTimerDrivenThreadCount(maxTimerDrivenThreadCount);
+     }
+ 
+     /**
+      * Sets the max event driven thread count of this controller.
+      *
+      * @param maxEventDrivenThreadCount
+      */
+     public void setMaxEventDrivenThreadCount(int maxEventDrivenThreadCount) {
+         flowController.setMaxEventDrivenThreadCount(maxEventDrivenThreadCount);
+     }
+ 
+     /**
+      * Gets the root group id.
+      *
+      * @return
+      */
+     public String getRootGroupId() {
+         return flowController.getRootGroupId();
+     }
+ 
+     /**
+      * Gets the input ports on the root group.
+      *
+      * @return
+      */
+     public Set<RootGroupPort> getInputPorts() {
+         final Set<RootGroupPort> inputPorts = new HashSet<>();
+         ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
+         for (final Port port : rootGroup.getInputPorts()) {
+             if (port instanceof RootGroupPort) {
+                 inputPorts.add((RootGroupPort) port);
+             }
+         }
+         return inputPorts;
+     }
+ 
+     /**
+      * Gets the output ports on the root group.
+      *
+      * @return
+      */
+     public Set<RootGroupPort> getOutputPorts() {
+         final Set<RootGroupPort> outputPorts = new HashSet<>();
+         ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
+         for (final Port port : rootGroup.getOutputPorts()) {
+             if (port instanceof RootGroupPort) {
+                 outputPorts.add((RootGroupPort) port);
+             }
+         }
+         return outputPorts;
+     }
+ 
+     /**
+      * Returns the status history for the specified processor.
+      *
+      * @param groupId
+      * @param processorId
+      * @return
+      */
+     public StatusHistoryDTO getProcessorStatusHistory(final String groupId, final String processorId) {
+         return flowController.getProcessorStatusHistory(processorId);
+     }
+ 
+     /**
+      * Returns the status history for the specified connection.
+      *
+      * @param groupId
+      * @param connectionId
+      * @return
+      */
+     public StatusHistoryDTO getConnectionStatusHistory(final String groupId, final String connectionId) {
+         return flowController.getConnectionStatusHistory(connectionId);
+     }
+ 
+     /**
+      * Returns the status history for the specified process group.
+      *
+      * @param groupId
+      * @return
+      */
+     public StatusHistoryDTO getProcessGroupStatusHistory(final String groupId) {
+         return flowController.getProcessGroupStatusHistory(groupId);
+     }
+ 
+     /**
+      * Returns the status history for the specified remote process group.
+      *
+      * @param groupId
+      * @param remoteProcessGroupId
+      * @return
+      */
+     public StatusHistoryDTO getRemoteProcessGroupStatusHistory(final String groupId, final String remoteProcessGroupId) {
+         return flowController.getRemoteProcessGroupStatusHistory(remoteProcessGroupId);
+     }
+ 
+     /**
+      * Get the node id of this controller.
+      *
+      * @return
+      */
+     public NodeIdentifier getNodeId() {
+         return flowController.getNodeId();
+     }
+ 
+     public boolean isClustered() {
+         return flowController.isClustered();
+     }
+ 
+     /**
+      * Gets the name of this controller.
+      *
+      * @return
+      */
+     public String getName() {
+         return flowController.getName();
+     }
+ 
+     public String getInstanceId() {
+         return flowController.getInstanceId();
+     }
+ 
+     /**
+      * Gets the comments of this controller.
+      *
+      * @return
+      */
+     public String getComments() {
+         return flowController.getComments();
+     }
+ 
+     /**
+      * Gets the max timer driven thread count of this controller.
+      *
+      * @return
+      */
+     public int getMaxTimerDrivenThreadCount() {
+         return flowController.getMaxTimerDrivenThreadCount();
+     }
+ 
+     /**
+      * Gets the max event driven thread count of this controller.
+      *
+      * @return
+      */
+     public int getMaxEventDrivenThreadCount() {
+         return flowController.getMaxEventDrivenThreadCount();
+     }
+ 
+     /**
+      * Gets the FlowFileProcessor types that this controller supports.
+      *
+      * @return
+      */
+     public Set<DocumentedTypeDTO> getFlowFileProcessorTypes() {
+         return dtoFactory.fromDocumentedTypes(ExtensionManager.getExtensions(Processor.class));
+     }
+ 
+     /**
+      * Gets the FlowFileComparator types that this controller supports.
+      *
+      * @return
+      */
+     public Set<DocumentedTypeDTO> getFlowFileComparatorTypes() {
+         return dtoFactory.fromDocumentedTypes(ExtensionManager.getExtensions(FlowFilePrioritizer.class));
+     }
++    
++    /**
++     * Gets the ControllerService types that this controller supports.
++     * 
++     * @return 
++     */
++    public Set<DocumentedTypeDTO> getControllerServiceTypes() {
++        return dtoFactory.fromDocumentedTypes(ControllerService.class, ExtensionManager.getExtensions(ControllerService.class));
++    }
++    
++    /**
++     * Gets the ReportingTask types that this controller supports.
++     * 
++     * @return 
++     */
++    public Set<DocumentedTypeDTO> getReportingTaskTypes() {
++        return dtoFactory.fromDocumentedTypes(ReportingTask.class, ExtensionManager.getExtensions(ReportingTask.class));
++    }
+ 
+     /**
+      * Gets the counters for this controller.
+      *
+      * @return
+      */
+     public List<Counter> getCounters() {
+         return flowController.getCounters();
+     }
+ 
+     /**
+      * Resets the counter with the specified id.
+      * @param id
+      * @return 
+      */
+     public Counter resetCounter(final String id) {
+         final Counter counter = flowController.resetCounter(id);
+ 
+         if (counter == null) {
+             throw new ResourceNotFoundException(String.format("Unable to find Counter with id '%s'.", id));
+         }
+ 
+         return counter;
+     }
++    
++    
+ 
+     /**
+      * Return the controller service for the specified identifier.
+      *
+      * @param serviceIdentifier
+      * @return
+      */
+     @Override
+     public ControllerService getControllerService(String serviceIdentifier) {
+         return flowController.getControllerService(serviceIdentifier);
+     }
+ 
+     @Override
+     public ControllerServiceNode createControllerService(String type, String id, Map<String, String> properties) {
+         return flowController.createControllerService(type, id, properties);
+     }
+ 
+     @Override
+     public Set<String> getControllerServiceIdentifiers(Class<? extends ControllerService> serviceType) {
+         return flowController.getControllerServiceIdentifiers(serviceType);
+     }
+ 
+     @Override
+     public ControllerServiceNode getControllerServiceNode(final String id) {
+         return flowController.getControllerServiceNode(id);
+     }
+ 
+     @Override
+     public boolean isControllerServiceEnabled(final ControllerService service) {
+         return flowController.isControllerServiceEnabled(service);
+     }
+ 
+     @Override
+     public boolean isControllerServiceEnabled(final String serviceIdentifier) {
+         return flowController.isControllerServiceEnabled(serviceIdentifier);
+     }
+ 
+     /**
+      * Gets the status of this controller.
+      *
+      * @return
+      */
+     public ControllerStatusDTO getControllerStatus() {
+         final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
+ 
+         final QueueSize controllerQueueSize = flowController.getTotalFlowFileCount(rootGroup);
+         final ControllerStatusDTO controllerStatus = new ControllerStatusDTO();
+         controllerStatus.setActiveThreadCount(flowController.getActiveThreadCount());
+         controllerStatus.setQueued(FormatUtils.formatCount(controllerQueueSize.getObjectCount()) + " / " + FormatUtils.formatDataSize(controllerQueueSize.getByteCount()));
+ 
+         final BulletinRepository bulletinRepository = getBulletinRepository();
+         final List<Bulletin> results = bulletinRepository.findBulletinsForController();
+         final List<BulletinDTO> bulletinDtos = new ArrayList<>(results.size());
+         for (final Bulletin bulletin : results) {
+             bulletinDtos.add(dtoFactory.createBulletinDto(bulletin));
+         }
+         controllerStatus.setBulletins(bulletinDtos);
+ 
+         final ProcessGroupCounts counts = rootGroup.getCounts();
+         controllerStatus.setRunningCount(counts.getRunningCount());
+         controllerStatus.setStoppedCount(counts.getStoppedCount());
+         controllerStatus.setInvalidCount(counts.getInvalidCount());
+         controllerStatus.setDisabledCount(counts.getDisabledCount());
+         controllerStatus.setActiveRemotePortCount(counts.getActiveRemotePortCount());
+         controllerStatus.setInactiveRemotePortCount(counts.getInactiveRemotePortCount());
+ 
+         return controllerStatus;
+     }
+ 
+     /**
+      * Gets the status for the specified process group.
+      *
+      * @param groupId
+      * @return
+      */
+     public ProcessGroupStatusDTO getProcessGroupStatus(final String groupId) {
+         final ProcessGroupStatus processGroupStatus = flowController.getGroupStatus(groupId);
+         if (processGroupStatus == null) {
+             throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
+         }
+         return dtoFactory.createProcessGroupStatusDto(flowController.getBulletinRepository(), processGroupStatus);
+     }
+ 
+     /**
+      * Gets the BulletinRepository.
+      *
+      * @return
+      */
+     public BulletinRepository getBulletinRepository() {
+         return flowController.getBulletinRepository();
+     }
+ 
+     /**
+      * Saves the state of the flow controller.
+      *
+      * @throws NiFiCoreException
+      */
+     public void save() throws NiFiCoreException {
+         // save the flow controller
+         final long writeDelaySeconds = FormatUtils.getTimeDuration(properties.getFlowServiceWriteDelay(), TimeUnit.SECONDS);
+         flowService.saveFlowChanges(TimeUnit.SECONDS, writeDelaySeconds);
+     }
+ 
+     /**
+      * Returns the socket port that the Cluster Manager is listening on for
+      * Site-to-Site communications
+      *
+      * @return
+      */
+     public Integer getClusterManagerRemoteSiteListeningPort() {
+         return flowController.getClusterManagerRemoteSiteListeningPort();
+     }
+ 
+     /**
+      * Indicates whether or not Site-to-Site communications with the Cluster
+      * Manager are secure
+      *
+      * @return
+      */
+     public Boolean isClusterManagerRemoteSiteCommsSecure() {
+         return flowController.isClusterManagerRemoteSiteCommsSecure();
+     }
+ 
+     /**
+      * Returns the socket port that the local instance is listening on for
+      * Site-to-Site communications
+      *
+      * @return
+      */
+     public Integer getRemoteSiteListeningPort() {
+         return flowController.getRemoteSiteListeningPort();
+     }
+ 
+     /**
+      * Indicates whether or not Site-to-Site communications with the local
+      * instance are secure
+      *
+      * @return
+      */
+     public Boolean isRemoteSiteCommsSecure() {
+         return flowController.isRemoteSiteCommsSecure();
+     }
+ 
+     /**
+      * Returns a SystemDiagnostics that describes the current state of the node
+      *
+      * @return
+      */
+     public SystemDiagnostics getSystemDiagnostics() {
+         return flowController.getSystemDiagnostics();
+     }
+ 
+     /**
+      * Gets the available options for searching provenance.
+      *
+      * @return
+      */
+     public ProvenanceOptionsDTO getProvenanceSearchOptions() {
+         final ProvenanceEventRepository provenanceRepository = flowController.getProvenanceRepository();
+ 
+         // create the search options dto
+         final ProvenanceOptionsDTO searchOptions = new ProvenanceOptionsDTO();
+         final List<ProvenanceSearchableFieldDTO> searchableFieldNames = new ArrayList<>();
+         final List<SearchableField> fields = provenanceRepository.getSearchableFields();
+         for (final SearchableField field : fields) {
+             final ProvenanceSearchableFieldDTO searchableField = new ProvenanceSearchableFieldDTO();
+             searchableField.setId(field.getIdentifier());
+             searchableField.setField(field.getSearchableFieldName());
+             searchableField.setLabel(field.getFriendlyName());
+             searchableField.setType(field.getFieldType().name());
+             searchableFieldNames.add(searchableField);
+         }
+         final List<SearchableField> searchableAttributes = provenanceRepository.getSearchableAttributes();
+         for (final SearchableField searchableAttr : searchableAttributes) {
+             final ProvenanceSearchableFieldDTO searchableAttribute = new ProvenanceSearchableFieldDTO();
+             searchableAttribute.setId(searchableAttr.getIdentifier());
+             searchableAttribute.setField(searchableAttr.getSearchableFieldName());
+             searchableAttribute.setLabel(searchableAttr.getFriendlyName());
+             searchableAttribute.setType(searchableAttr.getFieldType().name());
+             searchableFieldNames.add(searchableAttribute);
+         }
+         searchOptions.setSearchableFields(searchableFieldNames);
+         return searchOptions;
+     }
+ 
+     /**
+      * Submits a provenance query.
+      *
+      * @param provenanceDto
+      * @return
+      */
+     public ProvenanceDTO submitProvenance(ProvenanceDTO provenanceDto) {
+         final ProvenanceRequestDTO requestDto = provenanceDto.getRequest();
+ 
+         // create the query
+         final Query query = new Query(provenanceDto.getId());
+ 
+         // if the request was specified
+         if (requestDto != null) {
+             // add each search term specified
+             final Map<String, String> searchTerms = requestDto.getSearchTerms();
+             if (searchTerms != null) {
+                 for (final Map.Entry<String, String> searchTerm : searchTerms.entrySet()) {
+                     SearchableField field;
+ 
+                     field = SearchableFields.getSearchableField(searchTerm.getKey());
+                     if (field == null) {
+                         field = SearchableFields.newSearchableAttribute(searchTerm.getKey());
+                     }
+                     query.addSearchTerm(SearchTerms.newSearchTerm(field, searchTerm.getValue()));
+                 }
+             }
+ 
+             // specify the start date if specified
+             if (requestDto.getStartDate() != null) {
+                 query.setStartDate(requestDto.getStartDate());
+             }
+ 
+             // ensure an end date is populated
+             if (requestDto.getEndDate() != null) {
+                 query.setEndDate(requestDto.getEndDate());
+             }
+ 
+             // set the min/max file size
+             query.setMinFileSize(requestDto.getMinimumFileSize());
+             query.setMaxFileSize(requestDto.getMaximumFileSize());
+ 
+             // set the max results desired
+             query.setMaxResults(requestDto.getMaxResults());
+         }
+ 
+         // submit the query to the provenance repository
+         final ProvenanceEventRepository provenanceRepository = flowController.getProvenanceRepository();
+         final QuerySubmission querySubmission = provenanceRepository.submitQuery(query);
+ 
+         // return the query with the results populated at this point
+         return getProvenanceQuery(querySubmission.getQueryIdentifier());
+     }
+ 
+     /**
+      * Retrieves the results of a provenance query.
+      *
+      * @param provenanceId
+      * @return
+      */
+     public ProvenanceDTO getProvenanceQuery(String provenanceId) {
+         try {
+             // get the query to the provenance repository
+             final ProvenanceEventRepository provenanceRepository = flowController.getProvenanceRepository();
+             final QuerySubmission querySubmission = provenanceRepository.retrieveQuerySubmission(provenanceId);
+ 
+             // ensure the query results could be found
+             if (querySubmission == null) {
+                 throw new ResourceNotFoundException("Cannot find the results for the specified provenance requests. Results may have been purged.");
+             }
+ 
+             // get the original query and the results
+             final Query query = querySubmission.getQuery();
+             final QueryResult queryResult = querySubmission.getResult();
+ 
+             // build the response
+             final ProvenanceDTO provenanceDto = new ProvenanceDTO();
+             final ProvenanceRequestDTO requestDto = new ProvenanceRequestDTO();
+             final ProvenanceResultsDTO resultsDto = new ProvenanceResultsDTO();
+ 
+             // include the original request and results
+             provenanceDto.setRequest(requestDto);
+             provenanceDto.setResults(resultsDto);
+ 
+             // convert the original request
+             requestDto.setStartDate(query.getStartDate());
+             requestDto.setEndDate(query.getEndDate());
+             requestDto.setMinimumFileSize(query.getMinFileSize());
+             requestDto.setMaximumFileSize(query.getMaxFileSize());
+             requestDto.setMaxResults(query.getMaxResults());
+             if (query.getSearchTerms() != null) {
+                 final Map<String, String> searchTerms = new HashMap<>();
+                 for (final SearchTerm searchTerm : query.getSearchTerms()) {
+                     searchTerms.put(searchTerm.getSearchableField().getFriendlyName(), searchTerm.getValue());
+                 }
+                 requestDto.setSearchTerms(searchTerms);
+             }
+ 
+             // convert the provenance
+             provenanceDto.setId(query.getIdentifier());
+             provenanceDto.setSubmissionTime(querySubmission.getSubmissionTime());
+             provenanceDto.setExpiration(queryResult.getExpiration());
+             provenanceDto.setFinished(queryResult.isFinished());
+             provenanceDto.setPercentCompleted(queryResult.getPercentComplete());
+ 
+             // convert each event
+             final List<ProvenanceEventDTO> events = new ArrayList<>();
+             for (final ProvenanceEventRecord record : queryResult.getMatchingEvents()) {
+                 events.add(createProvenanceEventDto(record));
+             }
+             resultsDto.setProvenanceEvents(events);
+             resultsDto.setTotalCount(queryResult.getTotalHitCount());
+             resultsDto.setTotal(FormatUtils.formatCount(queryResult.getTotalHitCount()));
+ 
+             // include any errors
+             if (queryResult.getError() != null) {
+                 final Set<String> errors = new HashSet<>();
+                 errors.add(queryResult.getError());
+                 resultsDto.setErrors(errors);
+             }
+ 
+             // set the generated timestamp
+             final Date now = new Date();
+             resultsDto.setGenerated(now);
+             resultsDto.setTimeOffset(TimeZone.getDefault().getOffset(now.getTime()));
+ 
+             // get the oldest available event time
+             final List<ProvenanceEventRecord> firstEvent = provenanceRepository.getEvents(0, 1);
+             if (!firstEvent.isEmpty()) {
+                 resultsDto.setOldestEvent(new Date(firstEvent.get(0).getEventTime()));
+             }
+ 
+             provenanceDto.setResults(resultsDto);
+             return provenanceDto;
+         } catch (final IOException ioe) {
+             throw new NiFiCoreException("An error occured while searching the provenance events.", ioe);
+         }
+     }
+ 
+     /**
+      * Submits the specified lineage request.
+      *
+      * @param lineageDto
+      * @return
+      */
+     public LineageDTO submitLineage(LineageDTO lineageDto) {
+         final LineageRequestDTO requestDto = lineageDto.getRequest();
+ 
+         // get the provenance repo
+         final ProvenanceEventRepository provenanceRepository = flowController.getProvenanceRepository();
+         final ComputeLineageSubmission result;
+ 
+         // submit the event
+         if (LineageRequestType.FLOWFILE.equals(requestDto.getLineageRequestType())) {
+             // submit uuid
+             result = provenanceRepository.submitLineageComputation(requestDto.getUuid());
+         } else {
+             // submit event... (parents or children)
+             if (LineageRequestType.PARENTS.equals(requestDto.getLineageRequestType())) {
+                 result = provenanceRepository.submitExpandParents(requestDto.getEventId());
+             } else {
+                 result = provenanceRepository.submitExpandChildren(requestDto.getEventId());
+             }
+         }
+ 
+         return getLineage(result.getLineageIdentifier());
+     }
+ 
+     /**
+      * Gets the lineage with the specified id.
+      *
+      * @param lineageId
+      * @return
+      */
+     public LineageDTO getLineage(final String lineageId) {
+         // get the query to the provenance repository
+         final ProvenanceEventRepository provenanceRepository = flowController.getProvenanceRepository();
+         final ComputeLineageSubmission computeLineageSubmission = provenanceRepository.retrieveLineageSubmission(lineageId);
+ 
+         // ensure the submission was found
+         if (computeLineageSubmission == null) {
+             throw new ResourceNotFoundException("Cannot find the results for the specified lineage request. Results may have been purged.");
+         }
+ 
+         return dtoFactory.createLineageDto(computeLineageSubmission);
+     }
+ 
+     /**
+      * Deletes the query with the specified id.
+      *
+      * @param provenanceId
+      */
+     public void deleteProvenanceQuery(final String provenanceId) {
+         // get the query to the provenance repository
+         final ProvenanceEventRepository provenanceRepository = flowController.getProvenanceRepository();
+         final QuerySubmission querySubmission = provenanceRepository.retrieveQuerySubmission(provenanceId);
+         if (querySubmission != null) {
+             querySubmission.cancel();
+         }
+     }
+ 
+     /**
+      * Deletes the lineage with the specified id.
+      *
+      * @param lineageId
+      */
+     public void deleteLineage(final String lineageId) {
+         // get the query to the provenance repository
+         final ProvenanceEventRepository provenanceRepository = flowController.getProvenanceRepository();
+         final ComputeLineageSubmission computeLineageSubmission = provenanceRepository.retrieveLineageSubmission(lineageId);
+         if (computeLineageSubmission != null) {
+             computeLineageSubmission.cancel();
+         }
+     }
+ 
+     /**
+      * Gets the content for the specified claim.
+      *
+      * @param eventId
+      * @param uri
+      * @param contentDirection
+      * @return
+      */
+     public DownloadableContent getContent(final Long eventId, final String uri, final ContentDirection contentDirection) {
+         try {
+             final NiFiUser user = NiFiUserUtils.getNiFiUser();
+             if (user == null) {
+                 throw new WebApplicationException(new Throwable("Unable to access details for current user."));
+             }
+ 
+             // get the event in order to get the filename
+             final ProvenanceEventRecord event = flowController.getProvenanceRepository().getEvent(eventId);
+             if (event == null) {
+                 throw new ResourceNotFoundException("Unable to find the specified event.");
+             }
+ 
+             // get the flowfile attributes
+             final Map<String, String> attributes = event.getAttributes();
+ 
+             // calculate the dn chain
+             final List<String> dnChain = new ArrayList<>();
+ 
+             // build the dn chain
+             NiFiUser chainedUser = user;
+             do {
+                 // add the entry for this user
+                 dnChain.add(chainedUser.getDn());
+ 
+                 // go to the next user in the chain
+                 chainedUser = chainedUser.getChain();
+             } while (chainedUser != null);
+ 
+             // ensure the users in this chain are allowed to download this content
+             final DownloadAuthorization downloadAuthorization = userService.authorizeDownload(dnChain, attributes);
+             if (!downloadAuthorization.isApproved()) {
+                 throw new AccessDeniedException(downloadAuthorization.getExplanation());
+             }
+             
+             // get the filename and fall back to the idnetifier (should never happen)
+             String filename = event.getAttributes().get(CoreAttributes.FILENAME.key());
+             if (filename == null) {
+                 filename = event.getFlowFileUuid();
+             }
+ 
+             // get the mime-type
+             final String type = event.getAttributes().get(CoreAttributes.MIME_TYPE.key());
+ 
+             // get the content
+             final InputStream content = flowController.getContent(event, contentDirection, user.getDn(), uri);
+             return new DownloadableContent(filename, type, content);
+         } catch (final ContentNotFoundException cnfe) {
+             throw new ResourceNotFoundException("Unable to find the specified content.");
+         } catch (final IOException ioe) {
+             logger.error(String.format("Unable to get the content for event (%s) at this time.", eventId), ioe);
+             throw new IllegalStateException("Unable to get the content at this time.");
+         }
+     }
+ 
+     /**
+      * Submits a replay request for the specified event id.
+      *
+      * @param eventId
+      * @return
+      */
+     public ProvenanceEventDTO submitReplay(final Long eventId) {
+         try {
+             final NiFiUser user = NiFiUserUtils.getNiFiUser();
+             if (user == null) {
+                 throw new WebApplicationException(new Throwable("Unable to access details for current user."));
+             }
+ 
+             // lookup the original event
+             final ProvenanceEventRecord originalEvent = flowController.getProvenanceRepository().getEvent(eventId);
+             if (originalEvent == null) {
+                 throw new ResourceNotFoundException("Unable to find the specified event.");
+             }
+ 
+             // replay the flow file
+             final ProvenanceEventRecord event = flowController.replayFlowFile(originalEvent, user.getDn());
+ 
+             // convert the event record
+             return createProvenanceEventDto(event);
+         } catch (final IOException ioe) {
+             throw new NiFiCoreException("An error occured while getting the specified event.", ioe);
+         }
+     }
+ 
+     /**
+      * Get the provenance event with the specified event id.
+      *
+      * @param eventId
+      * @return
+      */
+     public ProvenanceEventDTO getProvenanceEvent(final Long eventId) {
+         try {
+             final ProvenanceEventRecord event = flowController.getProvenanceRepository().getEvent(eventId);
+             if (event == null) {
+                 throw new ResourceNotFoundException("Unable to find the specified event.");
+             }
+ 
+             // convert the event
+             return createProvenanceEventDto(event);
+         } catch (final IOException ioe) {
+             throw new NiFiCoreException("An error occured while getting the specified event.", ioe);
+         }
+     }
+ 
+     /**
+      * Creates a ProvenanceEventDTO for the specified ProvenanceEventRecord.
+      *
+      * @param event
+      * @return
+      */
+     private ProvenanceEventDTO createProvenanceEventDto(final ProvenanceEventRecord event) {
+         // convert the attributes
+         final Comparator<AttributeDTO> attributeComparator = new Comparator<AttributeDTO>() {
+             @Override
+             public int compare(AttributeDTO a1, AttributeDTO a2) {
+                 return Collator.getInstance(Locale.US).compare(a1.getName(), a2.getName());
+             }
+         };
+ 
+         final SortedSet<AttributeDTO> attributes = new TreeSet<>(attributeComparator);
+ 
+         final Map<String, String> updatedAttrs = event.getUpdatedAttributes();
+         final Map<String, String> previousAttrs = event.getPreviousAttributes();
+ 
+         // add previous attributes that haven't been modified.
+         for (final Map.Entry<String, String> entry : previousAttrs.entrySet()) {
+             // don't add any attributes that have been updated; we will do that next
+             if (updatedAttrs.containsKey(entry.getKey())) {
+                 continue;
+             }
+ 
+             final AttributeDTO attribute = new AttributeDTO();
+             attribute.setName(entry.getKey());
+             attribute.setValue(entry.getValue());
+             attribute.setPreviousValue(entry.getValue());
+             attributes.add(attribute);
+         }
+ 
+         // Add all of the update attributes
+         for (final Map.Entry<String, String> entry : updatedAttrs.entrySet()) {
+             final AttributeDTO attribute = new AttributeDTO();
+             attribute.setName(entry.getKey());
+             attribute.setValue(entry.getValue());
+             attribute.setPreviousValue(previousAttrs.get(entry.getKey()));
+             attributes.add(attribute);
+         }
+ 
+         // build the event dto
+         final ProvenanceEventDTO dto = new ProvenanceEventDTO();
+         dto.setId(String.valueOf(event.getEventId()));
+         dto.setAlternateIdentifierUri(event.getAlternateIdentifierUri());
+         dto.setAttributes(attributes);
+         dto.setTransitUri(event.getTransitUri());
+         dto.setEventId(event.getEventId());
+         dto.setEventTime(new Date(event.getEventTime()));
+         dto.setEventType(event.getEventType().name());
+         dto.setFileSize(FormatUtils.formatDataSize(event.getFileSize()));
+         dto.setFileSizeBytes(event.getFileSize());
+         dto.setComponentId(event.getComponentId());
+         dto.setComponentType(event.getComponentType());
+         dto.setSourceSystemFlowFileId(event.getSourceSystemFlowFileIdentifier());
+         dto.setFlowFileUuid(event.getFlowFileUuid());
+         dto.setRelationship(event.getRelationship());
+         dto.setDetails(event.getDetails());
+ 
+         final ContentAvailability contentAvailability = flowController.getContentAvailability(event);
+ 
+         // content
+         dto.setContentEqual(contentAvailability.isContentSame());
+         dto.setInputContentAvailable(contentAvailability.isInputAvailable());
+         dto.setInputContentClaimSection(event.getPreviousContentClaimSection());
+         dto.setInputContentClaimContainer(event.getPreviousContentClaimContainer());
+         dto.setInputContentClaimIdentifier(event.getPreviousContentClaimIdentifier());
+         dto.setInputContentClaimOffset(event.getPreviousContentClaimOffset());
+         dto.setInputContentClaimFileSizeBytes(event.getPreviousFileSize());
+         dto.setOutputContentAvailable(contentAvailability.isOutputAvailable());
+         dto.setOutputContentClaimSection(event.getContentClaimSection());
+         dto.setOutputContentClaimContainer(event.getContentClaimContainer());
+         dto.setOutputContentClaimIdentifier(event.getContentClaimIdentifier());
+         dto.setOutputContentClaimOffset(event.getContentClaimOffset());
+         dto.setOutputContentClaimFileSize(FormatUtils.formatDataSize(event.getFileSize()));
+         dto.setOutputContentClaimFileSizeBytes(event.getFileSize());
+ 
+         // format the previous file sizes if possible
+         if (event.getPreviousFileSize() != null) {
+             dto.setInputContentClaimFileSize(FormatUtils.formatDataSize(event.getPreviousFileSize()));
+         }
+ 
+         // replay
+         dto.setReplayAvailable(contentAvailability.isReplayable());
+         dto.setReplayExplanation(contentAvailability.getReasonNotReplayable());
+         dto.setSourceConnectionIdentifier(event.getSourceQueueIdentifier());
+ 
+         // sets the component details if it can find the component still in the flow
+         setComponentDetails(dto);
+ 
+         // event duration
+         if (event.getEventDuration() >= 0) {
+             dto.setEventDuration(event.getEventDuration());
+         }
+ 
+         // lineage duration
+         if (event.getLineageStartDate() > 0) {
+             final long lineageDuration = event.getEventTime() - event.getLineageStartDate();
+             dto.setLineageDuration(lineageDuration);
+         }
+ 
+         // parent uuids
+         final List<String> parentUuids = new ArrayList<>(event.getParentUuids());
+         Collections.sort(parentUuids, Collator.getInstance(Locale.US));
+         dto.setParentUuids(parentUuids);
+ 
+         // child uuids
+         final List<String> childUuids = new ArrayList<>(event.getChildUuids());
+         Collections.sort(childUuids, Collator.getInstance(Locale.US));
+         dto.setChildUuids(childUuids);
+ 
+         return dto;
+     }
+ 
+     /**
+      * Gets the name for the component with the specified id.
+      *
+      * @param dto
+      * @return
+      */
+     private void setComponentDetails(final ProvenanceEventDTO dto) {
+         final ProcessGroup root = flowController.getGroup(flowController.getRootGroupId());
+ 
+         final Connectable connectable = root.findConnectable(dto.getComponentId());
+         if (connectable != null) {
+             dto.setGroupId(connectable.getProcessGroup().getIdentifier());
+             dto.setComponentName(connectable.getName());
+         }
+     }
+ 
+     /**
+      * Searches this controller for the specified term.
+      *
+      * @param search
+      * @return
+      */
+     public SearchResultsDTO search(final String search) {
+         final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
+ 
+         final SearchResultsDTO results = new SearchResultsDTO();
+         search(results, search, rootGroup);
+ 
+         return results;
+     }
+ 
+     private void search(final SearchResultsDTO results, final String search, final ProcessGroup group) {
+         final ComponentSearchResultDTO groupMatch = search(search, group);
+         if (groupMatch != null) {
+             results.getProcessGroupResults().add(groupMatch);
+         }
+ 
+         for (final ProcessorNode procNode : group.getProcessors()) {
+             final ComponentSearchResultDTO match = search(search, procNode);
+             if (match != null) {
+                 match.setGroupId(group.getIdentifier());
+                 results.getProcessorResults().add(match);
+             }
+         }
+ 
+         for (final Connection connection : group.getConnections()) {
+             final ComponentSearchResultDTO match = search(search, connection);
+             if (match != null) {
+                 match.setGroupId(group.getIdentifier());
+                 results.getConnectionResults().add(match);
+             }
+         }
+ 
+         for (final RemoteProcessGroup remoteGroup : group.getRemoteProcessGroups()) {
+             final ComponentSearchResultDTO match = search(search, remoteGroup);
+             if (match != null) {
+                 match.setGroupId(group.getIdentifier());
+                 results.getRemoteProcessGroupResults().add(match);
+             }
+         }
+ 
+         for (final Port port : group.getInputPorts()) {
+             final ComponentSearchResultDTO match = search(search, port);
+             if (match != null) {
+                 match.setGroupId(group.getIdentifier());
+                 results.getInputPortResults().add(match);
+             }
+         }
+ 
+         for (final Port port : group.getOutputPorts()) {
+             final ComponentSearchResultDTO match = search(search, port);
+             if (match != null) {
+                 match.setGroupId(group.getIdentifier());
+                 results.getOutputPortResults().add(match);
+             }
+         }
+ 
+         for (final Funnel funnel : group.getFunnels()) {
+             final ComponentSearchResultDTO match = search(search, funnel);
+             if (match != null) {
+                 match.setGroupId(group.getIdentifier());
+                 results.getFunnelResults().add(match);
+             }
+         }
+ 
+         for (final ProcessGroup processGroup : group.getProcessGroups()) {
+             search(results, search, processGroup);
+         }
+     }
+ 
+     private ComponentSearchResultDTO search(final String searchStr, final Port port) {
+         final List<String> matches = new ArrayList<>();
+ 
+         addIfAppropriate(searchStr, port.getIdentifier(), "Id", matches);
+         addIfAppropriate(searchStr, port.getName(), "Name", matches);
+         addIfAppropriate(searchStr, port.getComments(), "Comments", matches);
+ 
+         // consider scheduled state
+         if (ScheduledState.DISABLED.equals(port.getScheduledState())) {
+             if (StringUtils.containsIgnoreCase("disabled", searchStr)) {
+                 matches.add("Run status: Disabled");
+             }
+         } else {
+             if (StringUtils.containsIgnoreCase("invalid", searchStr) && !port.isValid()) {
+                 matches.add("Run status: Invalid");
+             } else if (ScheduledState.RUNNING.equals(port.getScheduledState()) && StringUtils.containsIgnoreCase("running", searchStr)) {
+                 matches.add("Run status: Running");
+             } else if (ScheduledState.STOPPED.equals(port.getScheduledState()) && StringUtils.containsIgnoreCase("stopped", searchStr)) {
+                 matches.add("Run status: Stopped");
+             }
+         }
+ 
+         if (port instanceof RootGroupPort) {
+             final RootGroupPort rootGroupPort = (RootGroupPort) port;
+ 
+             // user access controls
+             for (final String userAccessControl : rootGroupPort.getUserAccessControl()) {
+                 addIfAppropriate(searchStr, userAccessControl, "User access control", matches);
+             }
+ 
+             // group access controls
+             for (final String groupAccessControl : rootGroupPort.getGroupAccessControl()) {
+                 addIfAppropriate(searchStr, groupAccessControl, "Group access control", matches);
+             }
+         }
+ 
+         if (matches.isEmpty()) {
+             return null;
+         }
+ 
+         final ComponentSearchResultDTO dto = new ComponentSearchResultDTO();
+         dto.setId(port.getIdentifier());
+         dto.setName(port.getName());
+         dto.setMatches(matches);
+         return dto;
+     }
+ 
+     private ComponentSearchResultDTO search(final String searchStr, final ProcessorNode procNode) {
+         final List<String> matches = new ArrayList<>();
+         final Processor processor = procNode.getProcessor();
+ 
+         addIfAppropriate(searchStr, procNode.getIdentifier(), "Id", matches);
+         addIfAppropriate(searchStr, procNode.getName(), "Name", matches);
+         addIfAppropriate(searchStr, procNode.getComments(), "Comments", matches);
+ 
+         // consider scheduling strategy
+         if (SchedulingStrategy.EVENT_DRIVEN.equals(procNode.getSchedulingStrategy()) && StringUtils.containsIgnoreCase("event", searchStr)) {
+             matches.add("Scheduling strategy: Event driven");
+         } else if (SchedulingStrategy.TIMER_DRIVEN.equals(procNode.getSchedulingStrategy()) && StringUtils.containsIgnoreCase("timer", searchStr)) {
+             matches.add("Scheduling strategy: Timer driven");
+         } else if (SchedulingStrategy.PRIMARY_NODE_ONLY.equals(procNode.getSchedulingStrategy()) && StringUtils.containsIgnoreCase("primary", searchStr)) {
+             matches.add("Scheduling strategy: On primary node");
+         }
+ 
+         // consider scheduled state
+         if (ScheduledState.DISABLED.equals(procNode.getScheduledState())) {
+             if (StringUtils.containsIgnoreCase("disabled", searchStr)) {
+                 matches.add("Run status: Disabled");
+             }
+         } else {
+             if (StringUtils.containsIgnoreCase("invalid", searchStr) && !procNode.isValid()) {
+                 matches.add("Run status: Invalid");
+             } else if (ScheduledState.RUNNING.equals(procNode.getScheduledState()) && StringUtils.containsIgnoreCase("running", searchStr)) {
+                 matches.add("Run status: Running");
+             } else if (ScheduledState.STOPPED.equals(procNode.getScheduledState()) && StringUtils.containsIgnoreCase("stopped", searchStr)) {
+                 matches.add("Run status: Stopped");
+             }
+         }
+ 
+         for (final Relationship relationship : procNode.getRelationships()) {
+             addIfAppropriate(searchStr, relationship.getName(), "Relationship", matches);
+         }
+         addIfAppropriate(searchStr, processor.getClass().getSimpleName(), "Type", matches);
+ 
+         for (final Map.Entry<PropertyDescriptor, String> entry : procNode.getProperties().entrySet()) {
+             final PropertyDescriptor descriptor = entry.getKey();
+             final String value = entry.getValue();
+             if (StringUtils.containsIgnoreCase(value, searchStr)) {
+                 matches.add("Property: " + descriptor.getName() + " - " + value);
+             }
+ 
+             addIfAppropriate(searchStr, descriptor.getName(), "Property", matches);
+             addIfAppropriate(searchStr, descriptor.getDescription(), "Property", matches);
+         }
+ 
+         // consider searching the processor directly
+         if (processor instanceof Searchable) {
+             final Searchable searchable = (Searchable) processor;
+ 
+             // prepare the search context
+             final SearchContext context = new StandardSearchContext(searchStr, procNode, flowController);
+ 
+             // search the processor using the appropriate thread context classloader
+             try (final NarCloseable x = NarCloseable.withNarLoader()) {
+                 final Collection<SearchResult> searchResults = searchable.search(context);
+                 if (CollectionUtils.isNotEmpty(searchResults)) {
+                     for (final SearchResult searchResult : searchResults) {
+                         matches.add(searchResult.getLabel() + ": " + searchResult.getMatch());
+                     }
+                 }
+             } catch (final Throwable t) {
+                 // log this as error
+             }
+         }
+ 
+         if (matches.isEmpty()) {
+             return null;
+         }
+ 
+         final ComponentSearchResultDTO result = new ComponentSearchResultDTO();
+         result.setId(procNode.getIdentifier());
+         result.setMatches(matches);
+         result.setName(procNode.getName());
+         return result;
+     }
+ 
+     private ComponentSearchResultDTO search(final String searchStr, final ProcessGroup group) {
+         final List<String> matches = new ArrayList<>();
+         final ProcessGroup parent = group.getParent();
+         if (parent == null) {
+             return null;
+         }
+ 
+         addIfAppropriate(searchStr, group.getIdentifier(), "Id", matches);
+         addIfAppropriate(searchStr, group.getName(), "Name", matches);
+         addIfAppropriate(searchStr, group.getComments(), "Comments", matches);
+ 
+         if (matches.isEmpty()) {
+             return null;
+         }
+ 
+         final ComponentSearchResultDTO result = new ComponentSearchResultDTO();
+         result.setId(group.getIdentifier());
+         result.setName(group.getName());
+         result.setGroupId(parent.getIdentifier());
+         result.setMatches(matches);
+         return result;
+     }
+ 
+     private ComponentSearchResultDTO search(final String searchStr, final Connection connection) {
+         final List<String> matches = new ArrayList<>();
+ 
+         // search id and name
+         addIfAppropriate(searchStr, connection.getIdentifier(), "Id", matches);
+         addIfAppropriate(searchStr, connection.getName(), "Name", matches);
+ 
+         // search relationships
+         for (final Relationship relationship : connection.getRelationships()) {
+             addIfAppropriate(searchStr, relationship.getName(), "Relationship", matches);
+         }
+ 
+         // search prioritizers
+         final FlowFileQueue queue = connection.getFlowFileQueue();
+         for (final FlowFilePrioritizer comparator : queue.getPriorities()) {
+             addIfAppropriate(searchStr, comparator.getClass().getName(), "Prioritizer", matches);
+         }
+         
+         // search expiration
+         if (StringUtils.containsIgnoreCase("expires", searchStr) || StringUtils.containsIgnoreCase("expiration", searchStr)) {
+             final int expirationMillis = connection.getFlowFileQueue().getFlowFileExpiration(TimeUnit.MILLISECONDS);
+             if (expirationMillis > 0) {
+                 matches.add("FlowFile expiration: " + connection.getFlowFileQueue().getFlowFileExpiration());
+             }
+         }
+         
+         // search back pressure
+         if (StringUtils.containsIgnoreCase("back pressure", searchStr) || StringUtils.containsIgnoreCase("pressure", searchStr)) {
+             final String backPressureDataSize = connection.getFlowFileQueue().getBackPressureDataSizeThreshold();
+             final Double backPressureBytes = DataUnit.parseDataSize(backPressureDataSize, DataUnit.B);
+             if (backPressureBytes > 0) {
+                 matches.add("Back pressure data size: " + backPressureDataSize);
+             }
+ 
+             final long backPressureCount = connection.getFlowFileQueue().getBackPressureObjectThreshold();
+             if (backPressureCount > 0) {
+                 matches.add("Back pressure count: " + backPressureCount);
+             }
+         }
+ 
+         // search the source
+         final Connectable source = connection.getSource();
+         addIfAppropriate(searchStr, source.getIdentifier(), "Source id", matches);
+         addIfAppropriate(searchStr, source.getName(), "Source name", matches);
+         addIfAppropriate(searchStr, source.getComments(), "Source comments", matches);
+ 
+         // search the destination
+         final Connectable destination = connection.getDestination();
+         addIfAppropriate(searchStr, destination.getIdentifier(), "Destination id", matches);
+         addIfAppropriate(searchStr, destination.getName(), "Destination name", matches);
+         addIfAppropriate(searchStr, destination.getComments(), "Destination comments", matches);
+ 
+         if (matches.isEmpty()) {
+             return null;
+         }
+ 
+         final ComponentSearchResultDTO result = new ComponentSearchResultDTO();
+         result.setId(connection.getIdentifier());
+ 
+         // determine the name of the search match
+         if (StringUtils.isNotBlank(connection.getName())) {
+             result.setName(connection.getName());
+         } else if (!connection.getRelationships().isEmpty()) {
+             final List<String> relationships = new ArrayList<>(connection.getRelationships().size());
+             for (final Relationship relationship : connection.getRelationships()) {
+                 if (StringUtils.isNotBlank(relationship.getName())) {
+                     relationships.add(relationship.getName());
+                 }
+             }
+             if (!relationships.isEmpty()) {
+                 result.setName(StringUtils.join(relationships, ", "));
+             }
+         }
+ 
+         // ensure a name is added
+         if (result.getName() == null) {
+             result.setName("From source " + connection.getSource().getName());
+         }
+ 
+         result.setMatches(matches);
+         return result;
+     }
+ 
+     private ComponentSearchResultDTO search(final String searchStr, final RemoteProcessGroup group) {
+         final List<String> matches = new ArrayList<>();
+         addIfAppropriate(searchStr, group.getIdentifier(), "Id", matches);
+         addIfAppropriate(searchStr, group.getName(), "Name", matches);
+         addIfAppropriate(searchStr, group.getComments(), "Comments", matches);
+         addIfAppropriate(searchStr, group.getTargetUri().toString(), "URL", matches);
+ 
+         // consider the transmission status
+         if ((StringUtils.containsIgnoreCase("transmitting", searchStr) || StringUtils.containsIgnoreCase("transmission enabled", searchStr)) && group.isTransmitting()) {
+             matches.add("Transmission: On");
+         } else if ((StringUtils.containsIgnoreCase("not transmitting", searchStr) || StringUtils.containsIgnoreCase("transmission disabled", searchStr)) && !group.isTransmitting()) {
+             matches.add("Transmission: Off");
+         }
+ 
+         if (matches.isEmpty()) {
+             return null;
+         }
+ 
+         final ComponentSearchResultDTO result = new ComponentSearchResultDTO();
+         result.setId(group.getIdentifier());
+         result.setName(group.getName());
+         result.setMatches(matches);
+         return result;
+     }
+ 
+     private ComponentSearchResultDTO search(final String searchStr, final Funnel funnel) {
+         final List<String> matches = new ArrayList<>();
+         addIfAppropriate(searchStr, funnel.getIdentifier(), "Id", matches);
+ 
+         if (matches.isEmpty()) {
+             return null;
+         }
+ 
+         final ComponentSearchResultDTO dto = new ComponentSearchResultDTO();
+         dto.setId(funnel.getIdentifier());
+         dto.setName(funnel.getName());
+         dto.setMatches(matches);
+         return dto;
+     }
+ 
+     private void addIfAppropriate(final String searchStr, final String value, final String label, final List<String> matches) {
+         if (StringUtils.containsIgnoreCase(value, searchStr)) {
+             matches.add(label + ": " + value);
+         }
+     }
+ 
+     /*
+      * setters
+      */
+     public void setFlowController(FlowController flowController) {
+         this.flowController = flowController;
+     }
+ 
+     public void setProperties(NiFiProperties properties) {
+         this.properties = properties;
+     }
+ 
+     public void setUserService(UserService userService) {
+         this.userService = userService;
+     }
+ 
+     public void setFlowService(FlowService flowService) {
+         this.flowService = flowService;
+     }
+ 
+     public void setDtoFactory(DtoFactory dtoFactory) {
+         this.dtoFactory = dtoFactory;
+     }
+ }