You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/07/04 16:55:17 UTC

[3/6] nifi-minifi git commit: MINIFI-38 Removing reliance on JettyServer in order to add a flow status reporting end point. This also removes the UI and adds a 'flowstatus' option to minifi.sh to get information on the current flow from the terminal.

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/status/StatusRequestParser.java
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/status/StatusRequestParser.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/status/StatusRequestParser.java
new file mode 100644
index 0000000..061120c
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/status/StatusRequestParser.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.status;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.diagnostics.GarbageCollection;
+import org.apache.nifi.diagnostics.StorageUsage;
+import org.apache.nifi.diagnostics.SystemDiagnostics;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.minifi.commons.status.common.BulletinStatus;
+import org.apache.nifi.minifi.commons.status.common.ValidationError;
+import org.apache.nifi.minifi.commons.status.connection.ConnectionHealth;
+import org.apache.nifi.minifi.commons.status.connection.ConnectionStats;
+import org.apache.nifi.minifi.commons.status.connection.ConnectionStatusBean;
+import org.apache.nifi.minifi.commons.status.controllerservice.ControllerServiceHealth;
+import org.apache.nifi.minifi.commons.status.controllerservice.ControllerServiceStatus;
+import org.apache.nifi.minifi.commons.status.instance.InstanceHealth;
+import org.apache.nifi.minifi.commons.status.instance.InstanceStats;
+import org.apache.nifi.minifi.commons.status.instance.InstanceStatus;
+import org.apache.nifi.minifi.commons.status.processor.ProcessorHealth;
+import org.apache.nifi.minifi.commons.status.processor.ProcessorStats;
+import org.apache.nifi.minifi.commons.status.processor.ProcessorStatusBean;
+import org.apache.nifi.minifi.commons.status.reportingTask.ReportingTaskHealth;
+import org.apache.nifi.minifi.commons.status.reportingTask.ReportingTaskStatus;
+import org.apache.nifi.minifi.commons.status.rpg.InputPortStatus;
+import org.apache.nifi.minifi.commons.status.rpg.RemoteProcessGroupHealth;
+import org.apache.nifi.minifi.commons.status.rpg.RemoteProcessGroupStats;
+import org.apache.nifi.minifi.commons.status.rpg.RemoteProcessGroupStatusBean;
+import org.apache.nifi.minifi.commons.status.system.ContentRepositoryUsage;
+import org.apache.nifi.minifi.commons.status.system.FlowfileRepositoryUsage;
+import org.apache.nifi.minifi.commons.status.system.GarbageCollectionStatus;
+import org.apache.nifi.minifi.commons.status.system.HeapStatus;
+import org.apache.nifi.minifi.commons.status.system.SystemDiagnosticsStatus;
+import org.apache.nifi.minifi.commons.status.system.SystemProcessorStats;
+import org.apache.nifi.remote.RemoteGroupPort;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinQuery;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public final class StatusRequestParser {
+    private StatusRequestParser() {
+    }
+
+    static ProcessorStatusBean parseProcessorStatusRequest(ProcessorStatus inputProcessorStatus, String statusTypes, FlowController flowController, Collection<ValidationResult> validationResults) {
+        ProcessorStatusBean processorStatusBean = new ProcessorStatusBean();
+        processorStatusBean.setName(inputProcessorStatus.getName());
+
+        String[] statusSplits = statusTypes.split(",");
+        List<Bulletin> bulletinList = flowController.getBulletinRepository().findBulletins(
+                new BulletinQuery.Builder()
+                        .sourceIdMatches(inputProcessorStatus.getId())
+                        .build());
+
+        for (String statusType : statusSplits) {
+            switch (statusType.toLowerCase().trim()) {
+                case "health":
+                    ProcessorHealth processorHealth = new ProcessorHealth();
+
+                    processorHealth.setRunStatus(inputProcessorStatus.getRunStatus().name());
+                    processorHealth.setHasBulletins(!bulletinList.isEmpty());
+                    processorHealth.setValidationErrorList(transformValidationResults(validationResults));
+
+                    processorStatusBean.setProcessorHealth(processorHealth);
+                    break;
+                case "bulletins":
+                    processorStatusBean.setBulletinList(transformBulletins(bulletinList));
+                    break;
+                case "stats":
+                    ProcessorStats processorStats = new ProcessorStats();
+
+                    processorStats.setActiveThreads(inputProcessorStatus.getActiveThreadCount());
+                    processorStats.setFlowfilesReceived(inputProcessorStatus.getFlowFilesReceived());
+                    processorStats.setBytesRead(inputProcessorStatus.getBytesRead());
+                    processorStats.setBytesWritten(inputProcessorStatus.getBytesWritten());
+                    processorStats.setFlowfilesSent(inputProcessorStatus.getFlowFilesSent());
+                    processorStats.setInvocations(inputProcessorStatus.getInvocations());
+                    processorStats.setProcessingNanos(inputProcessorStatus.getProcessingNanos());
+
+                    processorStatusBean.setProcessorStats(processorStats);
+                    break;
+            }
+        }
+        return processorStatusBean;
+    }
+
+    static RemoteProcessGroupStatusBean parseRemoteProcessGroupStatusRequest(RemoteProcessGroupStatus inputRemoteProcessGroupStatus, String statusTypes, FlowController flowController) {
+        RemoteProcessGroupStatusBean remoteProcessGroupStatusBean = new RemoteProcessGroupStatusBean();
+        remoteProcessGroupStatusBean.setName(inputRemoteProcessGroupStatus.getName());
+
+        String rootGroupId = flowController.getRootGroupId();
+        String[] statusSplits = statusTypes.split(",");
+
+        List<Bulletin> bulletinList = flowController.getBulletinRepository().findBulletins(
+                new BulletinQuery.Builder()
+                        .sourceIdMatches(inputRemoteProcessGroupStatus.getId())
+                        .build());
+        List<String> authorizationIssues = inputRemoteProcessGroupStatus.getAuthorizationIssues();
+
+        for (String statusType : statusSplits) {
+            switch (statusType.toLowerCase().trim()) {
+                case "health":
+                    RemoteProcessGroupHealth remoteProcessGroupHealth = new RemoteProcessGroupHealth();
+
+                    remoteProcessGroupHealth.setTransmissionStatus(inputRemoteProcessGroupStatus.getTransmissionStatus().name());
+                    remoteProcessGroupHealth.setHasAuthorizationIssues(!authorizationIssues.isEmpty());
+                    remoteProcessGroupHealth.setActivePortCount(inputRemoteProcessGroupStatus.getActiveRemotePortCount());
+                    remoteProcessGroupHealth.setInactivePortCount(inputRemoteProcessGroupStatus.getInactiveRemotePortCount());
+                    remoteProcessGroupHealth.setHasBulletins(!bulletinList.isEmpty());
+
+                    remoteProcessGroupStatusBean.setRemoteProcessGroupHealth(remoteProcessGroupHealth);
+                    break;
+                case "bulletins":
+                    remoteProcessGroupStatusBean.setBulletinList(transformBulletins(bulletinList));
+                    break;
+                case "authorizationissues":
+                    remoteProcessGroupStatusBean.setAuthorizationIssues(authorizationIssues);
+                    break;
+                case "inputports":
+                    List<InputPortStatus> inputPortStatusList = new LinkedList<>();
+                    RemoteProcessGroup remoteProcessGroup = flowController.getGroup(rootGroupId).getRemoteProcessGroup(inputRemoteProcessGroupStatus.getId());
+                    Set<RemoteGroupPort> inputPorts = remoteProcessGroup.getInputPorts();
+
+                    for (RemoteGroupPort inputPort : inputPorts) {
+                        InputPortStatus inputPortStatus = new InputPortStatus();
+
+                        inputPortStatus.setName(inputPort.getName());
+                        inputPortStatus.setTargetExists(inputPort.getTargetExists());
+                        inputPortStatus.setTargetRunning(inputPort.isTargetRunning());
+
+                        inputPortStatusList.add(inputPortStatus);
+                    }
+                    remoteProcessGroupStatusBean.setInputPortStatusList(inputPortStatusList);
+                    break;
+                case "stats":
+                    RemoteProcessGroupStats remoteProcessGroupStats = new RemoteProcessGroupStats();
+
+                    remoteProcessGroupStats.setActiveThreads(inputRemoteProcessGroupStatus.getActiveThreadCount());
+                    remoteProcessGroupStats.setSentContentSize(inputRemoteProcessGroupStatus.getSentContentSize());
+                    remoteProcessGroupStats.setSentCount(inputRemoteProcessGroupStatus.getSentCount());
+
+                    remoteProcessGroupStatusBean.setRemoteProcessGroupStats(remoteProcessGroupStats);
+                    break;
+            }
+        }
+        return remoteProcessGroupStatusBean;
+    }
+
+    static ConnectionStatusBean parseConnectionStatusRequest(ConnectionStatus inputConnectionStatus, String statusTypes, Logger logger) {
+        ConnectionStatusBean connectionStatusBean = new ConnectionStatusBean();
+        connectionStatusBean.setName(inputConnectionStatus.getName());
+
+        String[] statusSplits = statusTypes.split(",");
+        for (String statusType : statusSplits) {
+            switch (statusType.toLowerCase().trim()) {
+                case "health":
+                    ConnectionHealth connectionHealth = new ConnectionHealth();
+
+                    connectionHealth.setQueuedBytes(inputConnectionStatus.getQueuedBytes());
+                    connectionHealth.setQueuedCount(inputConnectionStatus.getQueuedCount());
+
+                    connectionStatusBean.setConnectionHealth(connectionHealth);
+                    break;
+                case "stats":
+                    ConnectionStats connectionStats = new ConnectionStats();
+
+                    connectionStats.setInputBytes(inputConnectionStatus.getInputBytes());
+                    connectionStats.setInputCount(inputConnectionStatus.getInputCount());
+                    connectionStats.setOutputCount(inputConnectionStatus.getOutputCount());
+                    connectionStats.setOutputBytes(inputConnectionStatus.getOutputBytes());
+
+                    connectionStatusBean.setConnectionStats(connectionStats);
+                    break;
+            }
+        }
+        return connectionStatusBean;
+    }
+
+    static ReportingTaskStatus parseReportingTaskStatusRequest(String id, ReportingTaskNode reportingTaskNode, String statusTypes, FlowController flowController, Logger logger) {
+        ReportingTaskStatus reportingTaskStatus = new ReportingTaskStatus();
+        reportingTaskStatus.setName(id);
+
+        String[] statusSplits = statusTypes.split(",");
+        List<Bulletin> bulletinList = flowController.getBulletinRepository().findBulletins(
+                new BulletinQuery.Builder()
+                        .sourceIdMatches(id)
+                        .build());
+        for (String statusType : statusSplits) {
+            switch (statusType.toLowerCase().trim()) {
+                case "health":
+                    ReportingTaskHealth reportingTaskHealth = new ReportingTaskHealth();
+
+                    reportingTaskHealth.setScheduledState(reportingTaskNode.getScheduledState().name());
+                    reportingTaskHealth.setActiveThreads(reportingTaskNode.getActiveThreadCount());
+                    reportingTaskHealth.setHasBulletins(!bulletinList.isEmpty());
+
+                    Collection<ValidationResult> validationResults = reportingTaskNode.getValidationErrors();
+                    reportingTaskHealth.setValidationErrorList(transformValidationResults(validationResults));
+
+                    reportingTaskStatus.setReportingTaskHealth(reportingTaskHealth);
+                    break;
+                case "bulletins":
+                    reportingTaskStatus.setBulletinList(transformBulletins(bulletinList));
+                    break;
+            }
+        }
+        return reportingTaskStatus;
+    }
+
+    static ControllerServiceStatus parseControllerServiceStatusRequest(ControllerServiceNode controllerServiceNode, String statusTypes, FlowController flowController, Logger logger) {
+        ControllerServiceStatus controllerServiceStatus = new ControllerServiceStatus();
+        String id = controllerServiceNode.getIdentifier();
+        controllerServiceStatus.setName(id);
+
+        String[] statusSplits = statusTypes.split(",");
+        List<Bulletin> bulletinList = flowController.getBulletinRepository().findBulletins(
+                new BulletinQuery.Builder()
+                        .sourceIdMatches(id)
+                        .build());
+        for (String statusType : statusSplits) {
+            switch (statusType.toLowerCase().trim()) {
+                case "health":
+                    ControllerServiceHealth controllerServiceHealth = new ControllerServiceHealth();
+
+                    controllerServiceHealth.setState(controllerServiceNode.getState().name());
+                    controllerServiceHealth.setHasBulletins(!bulletinList.isEmpty());
+
+                    Collection<ValidationResult> validationResults = controllerServiceNode.getValidationErrors();
+                    controllerServiceHealth.setValidationErrorList(transformValidationResults(validationResults));
+
+                    controllerServiceStatus.setControllerServiceHealth(controllerServiceHealth);
+                    break;
+                case "bulletins":
+                    controllerServiceStatus.setBulletinList(transformBulletins(bulletinList));
+                    break;
+            }
+        }
+        return controllerServiceStatus;
+    }
+
+    static SystemDiagnosticsStatus parseSystemDiagnosticsRequest(SystemDiagnostics inputSystemDiagnostics, String statusTypes) throws StatusRequestException {
+        if (inputSystemDiagnostics == null) {
+            throw new StatusRequestException("Unable to get system diagnostics");
+        }
+
+        SystemDiagnosticsStatus systemDiagnosticsStatus = new SystemDiagnosticsStatus();
+        String[] statusSplits = statusTypes.split(",");
+
+        for (String statusType : statusSplits) {
+            switch (statusType.toLowerCase().trim()) {
+                case "heap":
+                    HeapStatus heapStatus = new HeapStatus();
+                    heapStatus.setTotalHeap(inputSystemDiagnostics.getTotalHeap());
+                    heapStatus.setMaxHeap(inputSystemDiagnostics.getMaxHeap());
+                    heapStatus.setFreeHeap(inputSystemDiagnostics.getFreeHeap());
+                    heapStatus.setUsedHeap(inputSystemDiagnostics.getUsedHeap());
+                    heapStatus.setHeapUtilization(inputSystemDiagnostics.getHeapUtilization());
+                    heapStatus.setTotalNonHeap(inputSystemDiagnostics.getTotalNonHeap());
+                    heapStatus.setMaxNonHeap(inputSystemDiagnostics.getMaxNonHeap());
+                    heapStatus.setFreeNonHeap(inputSystemDiagnostics.getFreeNonHeap());
+                    heapStatus.setUsedNonHeap(inputSystemDiagnostics.getUsedNonHeap());
+                    heapStatus.setNonHeapUtilization(inputSystemDiagnostics.getNonHeapUtilization());
+                    systemDiagnosticsStatus.setHeapStatus(heapStatus);
+                    break;
+                case "processorstats":
+                    SystemProcessorStats systemProcessorStats = new SystemProcessorStats();
+                    systemProcessorStats.setAvailableProcessors(inputSystemDiagnostics.getAvailableProcessors());
+                    systemProcessorStats.setLoadAverage(inputSystemDiagnostics.getProcessorLoadAverage());
+                    systemDiagnosticsStatus.setProcessorStatus(systemProcessorStats);
+                    break;
+                case "contentrepositoryusage":
+                    List<ContentRepositoryUsage> contentRepositoryUsageList = new LinkedList<>();
+                    Map<String, StorageUsage> contentRepoStorage = inputSystemDiagnostics.getContentRepositoryStorageUsage();
+
+                    for (Map.Entry<String, StorageUsage> stringStorageUsageEntry : contentRepoStorage.entrySet()) {
+                        ContentRepositoryUsage contentRepositoryUsage = new ContentRepositoryUsage();
+                        StorageUsage storageUsage = stringStorageUsageEntry.getValue();
+
+                        contentRepositoryUsage.setName(storageUsage.getIdentifier());
+                        contentRepositoryUsage.setFreeSpace(storageUsage.getFreeSpace());
+                        contentRepositoryUsage.setTotalSpace(storageUsage.getTotalSpace());
+                        contentRepositoryUsage.setDiskUtilization(storageUsage.getDiskUtilization());
+                        contentRepositoryUsage.setUsedSpace(storageUsage.getUsedSpace());
+
+                        contentRepositoryUsageList.add(contentRepositoryUsage);
+                    }
+                    systemDiagnosticsStatus.setContentRepositoryUsageList(contentRepositoryUsageList);
+                    break;
+                case "flowfilerepositoryusage":
+                    FlowfileRepositoryUsage flowfileRepositoryUsage = new FlowfileRepositoryUsage();
+                    StorageUsage flowFileRepoStorage = inputSystemDiagnostics.getFlowFileRepositoryStorageUsage();
+
+                    flowfileRepositoryUsage.setFreeSpace(flowFileRepoStorage.getFreeSpace());
+                    flowfileRepositoryUsage.setTotalSpace(flowFileRepoStorage.getTotalSpace());
+                    flowfileRepositoryUsage.setDiskUtilization(flowFileRepoStorage.getDiskUtilization());
+                    flowfileRepositoryUsage.setUsedSpace(flowFileRepoStorage.getUsedSpace());
+
+                    systemDiagnosticsStatus.setFlowfileRepositoryUsage(flowfileRepositoryUsage);
+                    break;
+                case "garbagecollection":
+                    List<GarbageCollectionStatus> garbageCollectionStatusList = new LinkedList<>();
+                    Map<String, GarbageCollection> garbageCollectionMap = inputSystemDiagnostics.getGarbageCollection();
+
+                    for (Map.Entry<String, GarbageCollection> stringGarbageCollectionEntry : garbageCollectionMap.entrySet()) {
+                        GarbageCollectionStatus garbageCollectionStatus = new GarbageCollectionStatus();
+                        GarbageCollection garbageCollection = stringGarbageCollectionEntry.getValue();
+
+                        garbageCollectionStatus.setName(garbageCollection.getName());
+                        garbageCollectionStatus.setCollectionCount(garbageCollection.getCollectionCount());
+                        garbageCollectionStatus.setCollectionTime(garbageCollection.getCollectionTime());
+
+                        garbageCollectionStatusList.add(garbageCollectionStatus);
+                    }
+                    systemDiagnosticsStatus.setGarbageCollectionStatusList(garbageCollectionStatusList);
+                    break;
+            }
+        }
+        return systemDiagnosticsStatus;
+    }
+
+    static InstanceStatus parseInstanceRequest(String statusTypes, FlowController flowController, ProcessGroupStatus rootGroupStatus) {
+        InstanceStatus instanceStatus = new InstanceStatus();
+
+        flowController.getAllControllerServices();
+        List<Bulletin> bulletinList = flowController.getBulletinRepository().findBulletinsForController();
+        String[] statusSplits = statusTypes.split(",");
+
+        for (String statusType : statusSplits) {
+            switch (statusType.toLowerCase().trim()) {
+                case "health":
+                    InstanceHealth instanceHealth = new InstanceHealth();
+
+                    instanceHealth.setQueuedCount(rootGroupStatus.getQueuedCount());
+                    instanceHealth.setQueuedContentSize(rootGroupStatus.getQueuedContentSize());
+                    instanceHealth.setHasBulletins(!bulletinList.isEmpty());
+                    instanceHealth.setActiveThreads(rootGroupStatus.getActiveThreadCount());
+
+                    instanceStatus.setInstanceHealth(instanceHealth);
+                    break;
+                case "bulletins":
+                    instanceStatus.setBulletinList(transformBulletins(flowController.getBulletinRepository().findBulletinsForController()));
+                    break;
+                case "stats":
+                    InstanceStats instanceStats = new InstanceStats();
+
+                    instanceStats.setBytesRead(rootGroupStatus.getBytesRead());
+                    instanceStats.setBytesWritten(rootGroupStatus.getBytesWritten());
+                    instanceStats.setBytesSent(rootGroupStatus.getBytesSent());
+                    instanceStats.setFlowfilesSent(rootGroupStatus.getFlowFilesSent());
+                    instanceStats.setBytesTransferred(rootGroupStatus.getBytesTransferred());
+                    instanceStats.setFlowfilesTransferred(rootGroupStatus.getFlowFilesTransferred());
+                    instanceStats.setBytesReceived(rootGroupStatus.getBytesReceived());
+                    instanceStats.setFlowfilesReceived(rootGroupStatus.getFlowFilesReceived());
+
+                    instanceStatus.setInstanceStats(instanceStats);
+                    break;
+            }
+        }
+        return instanceStatus;
+    }
+
+    private static List<ValidationError> transformValidationResults(Collection<ValidationResult> validationResults) {
+        List<ValidationError> validationErrorList = new LinkedList<>();
+        for (ValidationResult validationResult : validationResults) {
+            if (!validationResult.isValid()) {
+                ValidationError validationError = new ValidationError();
+                validationError.setSubject(validationResult.getSubject());
+                validationError.setInput(validationResult.getInput());
+                validationError.setReason(validationResult.getExplanation());
+
+                validationErrorList.add(validationError);
+            }
+        }
+        return validationErrorList;
+    }
+
+    private static List<BulletinStatus> transformBulletins(List<Bulletin> bulletinList) {
+        List<BulletinStatus> bulletinStatusList = new LinkedList<>();
+        if (!bulletinList.isEmpty()) {
+            for (Bulletin bulletin : bulletinList) {
+                BulletinStatus bulletinStatus = new BulletinStatus();
+                bulletinStatus.setMessage(bulletin.getMessage());
+                bulletinStatus.setTimestamp(bulletin.getTimestamp());
+                bulletinStatusList.add(bulletinStatus);
+            }
+        }
+        return bulletinStatusList;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/status/TestStatusConfigReporter.java
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/status/TestStatusConfigReporter.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/status/TestStatusConfigReporter.java
new file mode 100644
index 0000000..027d685
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/status/TestStatusConfigReporter.java
@@ -0,0 +1,873 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.status;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfiguredComponent;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.RunStatus;
+import org.apache.nifi.controller.status.TransmissionStatus;
+import org.apache.nifi.diagnostics.GarbageCollection;
+import org.apache.nifi.diagnostics.StorageUsage;
+import org.apache.nifi.diagnostics.SystemDiagnostics;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.minifi.commons.status.FlowStatusReport;
+import org.apache.nifi.remote.RemoteGroupPort;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinQuery;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addConnectionStatus;
+import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addControllerServiceStatus;
+import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addExpectedRemoteProcessGroupStatus;
+import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addInstanceStatus;
+import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addProcessorStatus;
+import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addReportingTaskStatus;
+import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addSystemDiagnosticStatus;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestStatusConfigReporter {
+    private FlowController mockFlowController;
+    private ProcessGroupStatus rootGroupStatus;
+    private BulletinRepository bulletinRepo;
+    private ProcessGroup processGroup;
+
+    @Before
+    public void setup() {
+        mockFlowController = mock(FlowController.class);
+        rootGroupStatus = mock(ProcessGroupStatus.class);
+        bulletinRepo = mock(BulletinRepository.class);
+        processGroup = mock(ProcessGroup.class);
+
+        when(mockFlowController.getRootGroupId()).thenReturn("root");
+        when(mockFlowController.getGroupStatus("root")).thenReturn(rootGroupStatus);
+        when(mockFlowController.getControllerStatus()).thenReturn(rootGroupStatus);
+        when(mockFlowController.getBulletinRepository()).thenReturn(bulletinRepo);
+        when(mockFlowController.getGroup(mockFlowController.getRootGroupId())).thenReturn(processGroup);
+    }
+
+    @Test
+    public void processorStatusHealth() throws Exception {
+        populateProcessor(false, false);
+
+        String statusRequest = "processor:all:health";
+        FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+        FlowStatusReport expected = new FlowStatusReport();
+        expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+        addProcessorStatus(expected, true, false, false, false, false);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void processorStatusWithValidationErrors() throws Exception {
+        populateProcessor(true, false);
+
+        String statusRequest = "processor:all:health";
+        FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+        FlowStatusReport expected = new FlowStatusReport();
+        expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+        addProcessorStatus(expected, true, true, false, false, false);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void processorStatusAll() throws Exception {
+        populateProcessor(true, true);
+
+        String statusRequest = "processor:all:health, stats, bulletins";
+        FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+        FlowStatusReport expected = new FlowStatusReport();
+        expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+        addProcessorStatus(expected, true, true, true, true, true);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void connectionStatusHealth() throws Exception {
+        populateConnection();
+
+        String statusRequest = "connection:all:health";
+        FlowStatusReport status = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+        FlowStatusReport expected = new FlowStatusReport();
+        expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+        addConnectionStatus(expected, true, false);
+
+        assertEquals(expected, status);
+    }
+
+
+    @Test
+    public void connectionStatusAll() throws Exception {
+        populateConnection();
+
+        String statusRequest = "connection:all:health, stats";
+        FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+        FlowStatusReport expected = new FlowStatusReport();
+        expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+        addConnectionStatus(expected, true, true);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void connectionAndProcessorStatusHealth() throws Exception {
+
+        populateConnection();
+
+        populateProcessor(false, false);
+
+        String statusRequest = "connection:connectionId:health; processor:UpdateAttributeProcessorId:health";
+        FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+        FlowStatusReport expected = new FlowStatusReport();
+        addConnectionStatus(expected, true, false);
+
+        addProcessorStatus(expected, true, false, false, false, false);
+
+        expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void provenanceReportingTaskStatusHealth() throws Exception {
+        populateReportingTask(false, false);
+
+        String statusRequest = "provenanceReporting:health";
+        FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+        FlowStatusReport expected = new FlowStatusReport();
+        expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+        addReportingTaskStatus(expected, true, false, false, false);
+
+        assertEquals(expected, actual);
+    }
+
+
+    @Test
+    public void provenanceReportingTaskStatusBulletins() throws Exception {
+        populateReportingTask(true, false);
+
+        String statusRequest = "provenanceReporting:bulletins";
+        FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+        FlowStatusReport expected = new FlowStatusReport();
+        expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+        addReportingTaskStatus(expected, false, false, true, true);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void provenanceReportingTaskStatusAll() throws Exception {
+        populateReportingTask(true, true);
+
+        String statusRequest = "provenanceReporting:health,bulletins";
+        FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+        FlowStatusReport expected = new FlowStatusReport();
+        expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+        addReportingTaskStatus(expected, true, true, true, true);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void systemDiagnosticHeap() throws Exception {
+        populateSystemDiagnostics();
+
+        String statusRequest = "systemDiagnostics:heap";
+        FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+        FlowStatusReport expected = new FlowStatusReport();
+        expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+        addSystemDiagnosticStatus(expected, true, false, false, false, false);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void systemDiagnosticProcessorStats() throws Exception {
+        populateSystemDiagnostics();
+
+        String statusRequest = "systemDiagnostics:processorStats";
+        FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+        FlowStatusReport expected = new FlowStatusReport();
+        expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+        addSystemDiagnosticStatus(expected, false, true, false, false, false);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void systemDiagnosticFlowFileRepo() throws Exception {
+        populateSystemDiagnostics();
+
+        String statusRequest = "systemDiagnostics:flowfilerepositoryusage";
+        FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+        FlowStatusReport expected = new FlowStatusReport();
+        expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+        addSystemDiagnosticStatus(expected, false, false, true, false, false);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void systemDiagnosticContentRepo() throws Exception {
+        populateSystemDiagnostics();
+
+        String statusRequest = "systemDiagnostics:contentrepositoryusage";
+        FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+        FlowStatusReport expected = new FlowStatusReport();
+        expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+        addSystemDiagnosticStatus(expected, false, false, false, true, false);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void systemDiagnosticGarbageCollection() throws Exception {
+        populateSystemDiagnostics();
+
+        String statusRequest = "systemDiagnostics:garbagecollection";
+        FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+        FlowStatusReport expected = new FlowStatusReport();
+        expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+        addSystemDiagnosticStatus(expected, false, false, false, false, true);
+
+        assertEquals(expected, actual);
+    }
+
+
+    @Test
+    public void systemDiagnosticAll() throws Exception {
+        populateSystemDiagnostics();
+
+        String statusRequest = "systemDiagnostics:garbagecollection, heap, processorstats, contentrepositoryusage, flowfilerepositoryusage";
+        FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+        FlowStatusReport expected = new FlowStatusReport();
+        expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+        addSystemDiagnosticStatus(expected, true, true, true, true, true);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void instanceStatusHealth() throws Exception {
+        populateInstance(false);
+
+        String statusRequest = "instance:health";
+        FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+        FlowStatusReport expected = new FlowStatusReport();
+        expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+        addInstanceStatus(expected, true, false, false, false);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void instanceStatusBulletins() throws Exception {
+        populateInstance(true);
+
+        String statusRequest = "instance:bulletins";
+        FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+        FlowStatusReport expected = new FlowStatusReport();
+        expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+        addInstanceStatus(expected, false, false, true, true);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void instanceStatusStats() throws Exception {
+        populateInstance(false);
+
+        String statusRequest = "instance:stats";
+        FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+        FlowStatusReport expected = new FlowStatusReport();
+        expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+        addInstanceStatus(expected, false, true, false, false);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void instanceStatusAll() throws Exception {
+        populateInstance(true);
+
+        String statusRequest = "instance:stats, bulletins, health";
+        FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+        FlowStatusReport expected = new FlowStatusReport();
+        expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+        addInstanceStatus(expected, true, true, true, true);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void controllerServiceStatusHealth() throws Exception {
+        populateControllerService(false, false);
+
+        String statusRequest = "controllerServices:health";
+        FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+        FlowStatusReport expected = new FlowStatusReport();
+        expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+        addControllerServiceStatus(expected, true, false, false, false);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void controllerServiceStatusBulletins() throws Exception {
+        populateControllerService(false, true);
+
+        String statusRequest = "controllerServices:bulletins";
+        FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+        FlowStatusReport expected = new FlowStatusReport();
+        expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+        addControllerServiceStatus(expected, false, false, true, true);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void controllerServiceStatusAll() throws Exception {
+        populateControllerService(true, true);
+
+        String statusRequest = "controllerServices:bulletins, health";
+
+        FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+        FlowStatusReport expected = new FlowStatusReport();
+        expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+        addControllerServiceStatus(expected, true, true, true, true);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void remoteProcessGroupStatusHealth() throws Exception {
+        populateRemoteProcessGroup(false, false);
+
+        String statusRequest = "remoteProcessGroup:all:health";
+        FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+        FlowStatusReport expected = new FlowStatusReport();
+        expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+        addExpectedRemoteProcessGroupStatus(expected, true, false, false, false, false, false);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void remoteProcessGroupStatusBulletins() throws Exception {
+        populateRemoteProcessGroup(true, false);
+
+        String statusRequest = "remoteProcessGroup:all:bulletins";
+        FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+        FlowStatusReport expected = new FlowStatusReport();
+        expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+        addExpectedRemoteProcessGroupStatus(expected, false, false, false, false, true, true);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void remoteProcessGroupStatusAuthorizationIssues() throws Exception {
+        populateRemoteProcessGroup(false, true);
+
+        String statusRequest = "remoteProcessGroup:all:authorizationissues";
+        FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+        FlowStatusReport expected = new FlowStatusReport();
+        expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+        addExpectedRemoteProcessGroupStatus(expected, false, true, false, false, false, false);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void remoteProcessGroupStatusInputPorts() throws Exception {
+        populateRemoteProcessGroup(false, false);
+
+        String statusRequest = "remoteProcessGroup:all:inputPorts";
+        FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+        FlowStatusReport expected = new FlowStatusReport();
+        expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+        addExpectedRemoteProcessGroupStatus(expected, false, false, true, false, false, false);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void remoteProcessGroupStatusStats() throws Exception {
+        populateRemoteProcessGroup(false, false);
+
+        String statusRequest = "remoteProcessGroup:all:stats";
+        FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+        FlowStatusReport expected = new FlowStatusReport();
+        expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+        addExpectedRemoteProcessGroupStatus(expected, false, false, false, true, false, false);
+
+        assertEquals(expected, actual);
+    }
+
+
+    @Test
+    public void remoteProcessGroupStatusAll() throws Exception {
+        populateRemoteProcessGroup(true, true);
+
+        String statusRequest = "remoteProcessGroup:all:health, authorizationissues, bulletins, inputPorts, stats";
+        FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+        FlowStatusReport expected = new FlowStatusReport();
+        expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+        addExpectedRemoteProcessGroupStatus(expected, true, true, true, true, true, true);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void statusEverything() throws Exception {
+        when(bulletinRepo.findBulletins(anyObject())).thenReturn(Collections.emptyList());
+
+        populateControllerService(true, false);
+        populateInstance(true);
+        populateSystemDiagnostics();
+        populateReportingTask(false, true);
+        populateConnection();
+        populateProcessor(true, false);
+        populateRemoteProcessGroup(false, true);
+
+        String statusRequest = "controllerServices:bulletins,health; processor:all:health,stats,bulletins; instance:bulletins,health,stats ; systemDiagnostics:garbagecollection, heap, " +
+                "processorstats, contentrepositoryusage, flowfilerepositoryusage; connection:all:health,stats; provenanceReporting:health,bulletins; remoteProcessGroup:all:health, " +
+                "authorizationissues, bulletins, inputPorts, stats";
+
+        FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+        FlowStatusReport expected = new FlowStatusReport();
+        expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+        addControllerServiceStatus(expected, true, true, true, false);
+        addInstanceStatus(expected, true, true, true, true);
+        addSystemDiagnosticStatus(expected, true, true, true, true, true);
+        addReportingTaskStatus(expected, true, true, true, false);
+        addConnectionStatus(expected, true, true);
+        addProcessorStatus(expected, true, true, true, true, false);
+        addExpectedRemoteProcessGroupStatus(expected, true, true, true, true, true, false);
+
+        assertEquals(expected, actual);
+    }
+
+
+    /***************************
+     * Populator methods
+     *************************/
+
+    private void addBulletinsToInstance() {
+        Bulletin bulletin = mock(Bulletin.class);
+        when(bulletin.getTimestamp()).thenReturn(new Date(1464019245000L));
+        when(bulletin.getMessage()).thenReturn("Bulletin message");
+
+        List<Bulletin> bulletinList = new ArrayList<>();
+        bulletinList.add(bulletin);
+
+        when(bulletinRepo.findBulletinsForController()).thenReturn(bulletinList);
+    }
+
+    private void populateSystemDiagnostics() {
+        SystemDiagnostics systemDiagnostics = new SystemDiagnostics();
+        addGarbageCollectionToSystemDiagnostics(systemDiagnostics);
+        addHeapSystemDiagnostics(systemDiagnostics);
+        addContentRepoToSystemDiagnostics(systemDiagnostics);
+        addFlowFileRepoToSystemDiagnostics(systemDiagnostics);
+        addProcessorInfoToSystemDiagnostics(systemDiagnostics);
+        when(mockFlowController.getSystemDiagnostics()).thenReturn(systemDiagnostics);
+    }
+
+    private void populateControllerService(boolean validationErrors, boolean addBulletins) {
+        ControllerServiceNode controllerServiceNode = mock(ControllerServiceNode.class);
+        addControllerServiceHealth(controllerServiceNode);
+        if (validationErrors) {
+            addValidationErrors(controllerServiceNode);
+        }
+
+        if (addBulletins) {
+            addBulletins("Bulletin message", controllerServiceNode.getIdentifier());
+        }
+        HashSet<ControllerServiceNode> controllerServiceNodes = new HashSet<>();
+        controllerServiceNodes.add(controllerServiceNode);
+        when(mockFlowController.getAllControllerServices()).thenReturn(controllerServiceNodes);
+    }
+
+    private void populateInstance(boolean addBulletins) {
+        setRootGroupStatusVariables();
+        if (addBulletins) {
+            addBulletinsToInstance();
+        }
+    }
+
+    private void populateReportingTask(boolean addBulletins, boolean validationErrors) {
+        if (addBulletins) {
+            addBulletins("Bulletin message", "ReportProvenance");
+        }
+
+        ReportingTaskNode reportingTaskNode = mock(ReportingTaskNode.class);
+        addReportingTaskNodeVariables(reportingTaskNode);
+
+        HashSet<ReportingTaskNode> reportingTaskNodes = new HashSet<>();
+        reportingTaskNodes.add(reportingTaskNode);
+
+        when(mockFlowController.getAllReportingTasks()).thenReturn(reportingTaskNodes);
+
+        if (validationErrors) {
+            ValidationResult validationResult = new ValidationResult.Builder()
+                    .input("input")
+                    .subject("subject")
+                    .explanation("is not valid")
+                    .build();
+
+            ValidationResult validationResult2 = new ValidationResult.Builder()
+                    .input("input2")
+                    .subject("subject2")
+                    .explanation("is not valid too")
+                    .build();
+
+            List<ValidationResult> validationResultList = new ArrayList<>();
+            validationResultList.add(validationResult);
+            validationResultList.add(validationResult2);
+
+            when(reportingTaskNode.getValidationErrors()).thenReturn(validationResultList);
+        } else {
+            when(reportingTaskNode.getValidationErrors()).thenReturn(Collections.EMPTY_LIST);
+        }
+    }
+
+    private void populateConnection() {
+        ConnectionStatus connectionStatus = new ConnectionStatus();
+        connectionStatus.setQueuedBytes(100);
+        connectionStatus.setId("connectionId");
+        connectionStatus.setName("connectionId");
+        connectionStatus.setQueuedCount(10);
+        connectionStatus.setInputCount(1);
+        connectionStatus.setInputBytes(2);
+        connectionStatus.setOutputCount(3);
+        connectionStatus.setOutputBytes(4);
+
+        Collection<ConnectionStatus> statusCollection = new ArrayList<>();
+        statusCollection.add(connectionStatus);
+
+        when(rootGroupStatus.getConnectionStatus()).thenReturn(statusCollection);
+    }
+
+    private void populateProcessor(boolean validationErrors, boolean addBulletins) {
+        if (addBulletins) {
+            addBulletins("Bulletin message", "UpdateAttributeProcessorId");
+        }
+
+        ProcessorStatus processorStatus = new ProcessorStatus();
+        processorStatus.setType("org.apache.nifi.processors.attributes.UpdateAttribute");
+        processorStatus.setId("UpdateAttributeProcessorId");
+        processorStatus.setName("UpdateAttributeProcessorId");
+        processorStatus.setRunStatus(RunStatus.Stopped);
+        processorStatus.setActiveThreadCount(1);
+        processorStatus.setFlowFilesReceived(2);
+        processorStatus.setBytesRead(3);
+        processorStatus.setBytesWritten(4);
+        processorStatus.setFlowFilesSent(5);
+        processorStatus.setInvocations(6);
+        processorStatus.setProcessingNanos(7);
+
+        Collection<ProcessorStatus> statusCollection = new ArrayList<>();
+        statusCollection.add(processorStatus);
+
+        mockProcessorEmptyValidation(processorStatus.getId(), processGroup);
+        when(rootGroupStatus.getProcessorStatus()).thenReturn(statusCollection);
+
+        ProcessorNode processorNode = mock(ProcessorNode.class);
+        when(processGroup.getProcessor(processorStatus.getId())).thenReturn(processorNode);
+
+        if (validationErrors) {
+            ValidationResult validationResult = new ValidationResult.Builder()
+                    .input("input")
+                    .subject("subject")
+                    .explanation("is not valid")
+                    .build();
+
+            ValidationResult validationResult2 = new ValidationResult.Builder()
+                    .input("input2")
+                    .subject("subject2")
+                    .explanation("is not valid too")
+                    .build();
+
+            List<ValidationResult> validationResultList = new ArrayList<>();
+            validationResultList.add(validationResult);
+            validationResultList.add(validationResult2);
+
+            when(processorNode.getValidationErrors()).thenReturn(validationResultList);
+        } else {
+            when(processorNode.getValidationErrors()).thenReturn(Collections.EMPTY_LIST);
+        }
+    }
+
+    private void populateRemoteProcessGroup(boolean addBulletins, boolean addAuthIssues) {
+        when(mockFlowController.getGroup(mockFlowController.getRootGroupId())).thenReturn(processGroup);
+
+        RemoteProcessGroup remoteProcessGroup = mock(RemoteProcessGroup.class);
+        when(processGroup.getRemoteProcessGroup(any())).thenReturn(remoteProcessGroup);
+
+        RemoteGroupPort remoteGroupPort = mock(RemoteGroupPort.class);
+        when(remoteGroupPort.getName()).thenReturn("inputPort");
+        when(remoteGroupPort.getTargetExists()).thenReturn(true);
+        when(remoteGroupPort.isTargetRunning()).thenReturn(false);
+
+        when(remoteProcessGroup.getInputPorts()).thenReturn(Collections.singleton(remoteGroupPort));
+
+        RemoteProcessGroupStatus remoteProcessGroupStatus = new RemoteProcessGroupStatus();
+        addRemoteProcessGroupStatus(remoteProcessGroupStatus);
+        if (addAuthIssues) {
+            remoteProcessGroupStatus.setAuthorizationIssues(Collections.singletonList("auth issue"));
+        } else {
+            remoteProcessGroupStatus.setAuthorizationIssues(Collections.EMPTY_LIST);
+        }
+        if (addBulletins) {
+            addBulletins("Bulletin message", remoteProcessGroupStatus.getId());
+        }
+        when(rootGroupStatus.getRemoteProcessGroupStatus()).thenReturn(Collections.singletonList(remoteProcessGroupStatus));
+    }
+
+
+    private void setRootGroupStatusVariables() {
+        when(rootGroupStatus.getQueuedContentSize()).thenReturn(1L);
+        when(rootGroupStatus.getQueuedCount()).thenReturn(2);
+        when(rootGroupStatus.getActiveThreadCount()).thenReturn(3);
+        when(rootGroupStatus.getBytesRead()).thenReturn(1L);
+        when(rootGroupStatus.getBytesWritten()).thenReturn(2L);
+        when(rootGroupStatus.getBytesSent()).thenReturn(3L);
+        when(rootGroupStatus.getFlowFilesSent()).thenReturn(4);
+        when(rootGroupStatus.getBytesTransferred()).thenReturn(5L);
+        when(rootGroupStatus.getFlowFilesTransferred()).thenReturn(6);
+        when(rootGroupStatus.getBytesReceived()).thenReturn(7L);
+        when(rootGroupStatus.getFlowFilesReceived()).thenReturn(8);
+    }
+
+    private void addGarbageCollectionToSystemDiagnostics(SystemDiagnostics systemDiagnostics) {
+        Map<String, GarbageCollection> garbageCollectionMap = new HashMap<>();
+
+        GarbageCollection garbageCollection1 = new GarbageCollection();
+        garbageCollection1.setCollectionCount(1);
+        garbageCollection1.setCollectionTime(10);
+        garbageCollection1.setName("garbage 1");
+        garbageCollectionMap.put(garbageCollection1.getName(), garbageCollection1);
+
+        systemDiagnostics.setGarbageCollection(garbageCollectionMap);
+    }
+
+    private void addContentRepoToSystemDiagnostics(SystemDiagnostics systemDiagnostics) {
+        Map<String, StorageUsage> stringStorageUsageMap = new HashMap<>();
+
+        StorageUsage repoUsage1 = new StorageUsage();
+        repoUsage1.setFreeSpace(30);
+        repoUsage1.setTotalSpace(100);
+        repoUsage1.setIdentifier("Content repo1");
+        stringStorageUsageMap.put(repoUsage1.getIdentifier(), repoUsage1);
+
+        systemDiagnostics.setContentRepositoryStorageUsage(stringStorageUsageMap);
+    }
+
+    private void addFlowFileRepoToSystemDiagnostics(SystemDiagnostics systemDiagnostics) {
+        StorageUsage repoUsage = new StorageUsage();
+        repoUsage.setFreeSpace(30);
+        repoUsage.setTotalSpace(100);
+        repoUsage.setIdentifier("FlowFile repo");
+        systemDiagnostics.setFlowFileRepositoryStorageUsage(repoUsage);
+    }
+
+    private void addHeapSystemDiagnostics(SystemDiagnostics systemDiagnostics) {
+        systemDiagnostics.setMaxHeap(5);
+        systemDiagnostics.setTotalHeap(3);
+        systemDiagnostics.setUsedHeap(2);
+        systemDiagnostics.setMaxNonHeap(9);
+        systemDiagnostics.setTotalNonHeap(8);
+        systemDiagnostics.setUsedNonHeap(6);
+    }
+
+    private void addProcessorInfoToSystemDiagnostics(SystemDiagnostics systemDiagnostics) {
+        systemDiagnostics.setProcessorLoadAverage(80.9);
+        systemDiagnostics.setAvailableProcessors(5);
+    }
+
+    private void mockProcessorEmptyValidation(String id, ProcessGroup processGroup) {
+        ProcessorNode processorNode = mock(ProcessorNode.class);
+        when(processGroup.getProcessor(id)).thenReturn(processorNode);
+        when(processorNode.getValidationErrors()).thenReturn(Collections.emptyList());
+    }
+
+    private void addControllerServiceHealth(ControllerServiceNode controllerServiceNode) {
+        when(controllerServiceNode.getName()).thenReturn("mockControllerService");
+        when(controllerServiceNode.getIdentifier()).thenReturn("mockControllerService");
+        when(controllerServiceNode.getState()).thenReturn(ControllerServiceState.ENABLED);
+        when(controllerServiceNode.getValidationErrors()).thenReturn(Collections.emptyList());
+    }
+
+    private void addReportingTaskNodeVariables(ReportingTaskNode reportingTaskNode) {
+        when(reportingTaskNode.getValidationErrors()).thenReturn(Collections.emptyList());
+        when(reportingTaskNode.getActiveThreadCount()).thenReturn(1);
+        when(reportingTaskNode.getScheduledState()).thenReturn(ScheduledState.RUNNING);
+        when(reportingTaskNode.getIdentifier()).thenReturn("ReportProvenance");
+        when(reportingTaskNode.getName()).thenReturn("ReportProvenance");
+
+    }
+
+    private void addRemoteProcessGroupStatus(RemoteProcessGroupStatus remoteProcessGroupStatus) {
+        remoteProcessGroupStatus.setName("rpg1");
+        remoteProcessGroupStatus.setId("rpg1");
+        remoteProcessGroupStatus.setTransmissionStatus(TransmissionStatus.Transmitting);
+        remoteProcessGroupStatus.setActiveRemotePortCount(1);
+        remoteProcessGroupStatus.setInactiveRemotePortCount(2);
+
+        remoteProcessGroupStatus.setActiveThreadCount(3);
+        remoteProcessGroupStatus.setSentContentSize(4L);
+        remoteProcessGroupStatus.setSentCount(5);
+    }
+
+    private void addBulletins(String message, String sourceId) {
+        Bulletin bulletin = mock(Bulletin.class);
+        when(bulletin.getTimestamp()).thenReturn(new Date(1464019245000L));
+        when(bulletin.getMessage()).thenReturn(message);
+
+        List<Bulletin> bulletinList = new ArrayList<>();
+        bulletinList.add(bulletin);
+
+        BulletinQueryAnswer bulletinQueryAnswer = new BulletinQueryAnswer(sourceId, bulletinList);
+        when(bulletinRepo.findBulletins(anyObject())).then(bulletinQueryAnswer);
+    }
+
+    private void addValidationErrors(ConfiguredComponent connectable) {
+        ValidationResult validationResult = new ValidationResult.Builder()
+                .input("input")
+                .subject("subject")
+                .explanation("is not valid")
+                .build();
+
+        ValidationResult validationResult2 = new ValidationResult.Builder()
+                .input("input2")
+                .subject("subject2")
+                .explanation("is not valid too")
+                .build();
+
+        List<ValidationResult> validationResultList = new ArrayList<>();
+        validationResultList.add(validationResult);
+        validationResultList.add(validationResult2);
+        when(connectable.getValidationErrors()).thenReturn(validationResultList);
+    }
+
+    private class BulletinQueryAnswer implements Answer {
+
+        final List<Bulletin> bulletinList;
+        String idToMatch = "";
+
+        private BulletinQueryAnswer(String idToMatch, List<Bulletin> bulletinList) {
+            this.idToMatch = idToMatch;
+            this.bulletinList = bulletinList;
+        }
+
+        @Override
+        public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+            BulletinQuery bulletinQuery = (BulletinQuery) invocationOnMock.getArguments()[0];
+            if (idToMatch.equals(bulletinQuery.getSourceIdPattern().toString())) {
+                return bulletinList;
+            }
+            return Collections.emptyList();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/pom.xml b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/pom.xml
new file mode 100644
index 0000000..c8b497b
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>minifi-framework</artifactId>
+        <groupId>org.apache.nifi.minifi</groupId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>minifi-nar-utils</artifactId>
+
+    <packaging>jar</packaging>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-properties</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java
new file mode 100644
index 0000000..db0b35e
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.nar;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.Set;
+import org.apache.nifi.authentication.LoginIdentityProvider;
+
+import org.apache.nifi.authorization.AuthorityProvider;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.status.history.ComponentStatusRepository;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.reporting.ReportingTask;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Scans through the classpath to load all FlowFileProcessors, FlowFileComparators, and ReportingTasks using the service provider API and running through all classloaders (root, NARs).
+ *
+ * @ThreadSafe - is immutable
+ */
+@SuppressWarnings("rawtypes")
+public class ExtensionManager {
+
+    private static final Logger logger = LoggerFactory.getLogger(ExtensionManager.class);
+
+    // Maps a service definition (interface) to those classes that implement the interface
+    private static final Map<Class, Set<Class>> definitionMap = new HashMap<>();
+
+    private static final Map<String, ClassLoader> extensionClassloaderLookup = new HashMap<>();
+
+    static {
+        definitionMap.put(Processor.class, new HashSet<Class>());
+        definitionMap.put(FlowFilePrioritizer.class, new HashSet<Class>());
+        definitionMap.put(ReportingTask.class, new HashSet<Class>());
+        definitionMap.put(ControllerService.class, new HashSet<Class>());
+        definitionMap.put(AuthorityProvider.class, new HashSet<Class>());
+        definitionMap.put(LoginIdentityProvider.class, new HashSet<Class>());
+        definitionMap.put(ProvenanceEventRepository.class, new HashSet<Class>());
+        definitionMap.put(ComponentStatusRepository.class, new HashSet<Class>());
+        definitionMap.put(FlowFileRepository.class, new HashSet<Class>());
+        definitionMap.put(FlowFileSwapManager.class, new HashSet<Class>());
+        definitionMap.put(ContentRepository.class, new HashSet<Class>());
+    }
+
+    /**
+     * Loads all FlowFileProcessor, FlowFileComparator, ReportingTask class types that can be found on the bootstrap classloader and by creating classloaders for all NARs found within the classpath.
+     */
+    public static void discoverExtensions() {
+        final ClassLoader systemClassLoader = ClassLoader.getSystemClassLoader();
+
+        // get the current context class loader
+        ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader();
+
+        // consider the system class loader
+        loadExtensions(systemClassLoader);
+
+        // consider each nar class loader
+        for (final ClassLoader ncl : NarClassLoaders.getExtensionClassLoaders()) {
+
+            // Must set the context class loader to the nar classloader itself
+            // so that static initialization techniques that depend on the context class loader will work properly
+            Thread.currentThread().setContextClassLoader(ncl);
+            loadExtensions(ncl);
+        }
+
+        // restore the current context class loader if appropriate
+        if (currentContextClassLoader != null) {
+            Thread.currentThread().setContextClassLoader(currentContextClassLoader);
+        }
+    }
+
+    /**
+     * Loads extensions from the specified class loader.
+     *
+     * @param classLoader from which to load extensions
+     */
+    @SuppressWarnings("unchecked")
+    private static void loadExtensions(final ClassLoader classLoader) {
+        for (final Map.Entry<Class, Set<Class>> entry : definitionMap.entrySet()) {
+            final ServiceLoader<?> serviceLoader = ServiceLoader.load(entry.getKey(), classLoader);
+
+            for (final Object o : serviceLoader) {
+                registerServiceClass(o.getClass(), extensionClassloaderLookup, classLoader, entry.getValue());
+            }
+        }
+    }
+
+    /**
+     * Registers extension for the specified type from the specified ClassLoader.
+     *
+     * @param type the extension type
+     * @param classloaderMap mapping of classname to classloader
+     * @param classLoader the classloader being mapped to
+     * @param classes to map to this classloader but which come from its ancestors
+     */
+    private static void registerServiceClass(final Class<?> type, final Map<String, ClassLoader> classloaderMap, final ClassLoader classLoader, final Set<Class> classes) {
+        final String className = type.getName();
+        final ClassLoader registeredClassLoader = classloaderMap.get(className);
+
+        // see if this class is already registered (this should happen when the class is loaded by an ancestor of the specified classloader)
+        if (registeredClassLoader == null) {
+            classloaderMap.put(className, classLoader);
+            classes.add(type);
+        } else {
+            boolean loadedFromAncestor = false;
+
+            // determine if this class was loaded from an ancestor
+            ClassLoader ancestorClassLoader = classLoader.getParent();
+            while (ancestorClassLoader != null) {
+                if (ancestorClassLoader == registeredClassLoader) {
+                    loadedFromAncestor = true;
+                    break;
+                }
+                ancestorClassLoader = ancestorClassLoader.getParent();
+            }
+
+            // if this class was loaded from a non ancestor class loader, report potential unexpected behavior
+            if (!loadedFromAncestor) {
+                logger.warn("Attempt was made to load " + className + " from " + classLoader
+                        + " but that class name is already loaded/registered from " + registeredClassLoader
+                        + ".  This may cause unpredictable behavior.  Order of NARs is not guaranteed.");
+            }
+        }
+    }
+
+    /**
+     * Determines the effective classloader for classes of the given type. If returns null it indicates the given type is not known or was not detected.
+     *
+     * @param classType to lookup the classloader of
+     * @return String of fully qualified class name; null if not a detected type
+     */
+    public static ClassLoader getClassLoader(final String classType) {
+        return extensionClassloaderLookup.get(classType);
+    }
+
+    public static Set<Class> getExtensions(final Class<?> definition) {
+        final Set<Class> extensions = definitionMap.get(definition);
+        return (extensions == null) ? Collections.<Class>emptySet() : extensions;
+    }
+
+    public static void logClassLoaderMapping() {
+        final StringBuilder builder = new StringBuilder();
+
+        builder.append("Extension Type Mapping to Classloader:");
+        for (final Map.Entry<Class, Set<Class>> entry : definitionMap.entrySet()) {
+            builder.append("\n\t=== ").append(entry.getKey().getSimpleName()).append(" type || Classloader ===");
+
+            for (final Class type : entry.getValue()) {
+                builder.append("\n\t").append(type.getName()).append(" || ").append(getClassLoader(type.getName()));
+            }
+
+            builder.append("\n\t=== End ").append(entry.getKey().getSimpleName()).append(" types ===");
+        }
+
+        logger.info(builder.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionMapping.java
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionMapping.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionMapping.java
new file mode 100644
index 0000000..c478d97
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionMapping.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.nar;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+public class ExtensionMapping {
+
+    private final List<String> processorNames = new ArrayList<>();
+    private final List<String> controllerServiceNames = new ArrayList<>();
+    private final List<String> reportingTaskNames = new ArrayList<>();
+
+    void addProcessor(final String processorName) {
+        processorNames.add(processorName);
+    }
+
+    void addAllProcessors(final Collection<String> processorNames) {
+        this.processorNames.addAll(processorNames);
+    }
+
+    void addControllerService(final String controllerServiceName) {
+        controllerServiceNames.add(controllerServiceName);
+    }
+
+    void addAllControllerServices(final Collection<String> controllerServiceNames) {
+        this.controllerServiceNames.addAll(controllerServiceNames);
+    }
+
+    void addReportingTask(final String reportingTaskName) {
+        reportingTaskNames.add(reportingTaskName);
+    }
+
+    void addAllReportingTasks(final Collection<String> reportingTaskNames) {
+        this.reportingTaskNames.addAll(reportingTaskNames);
+    }
+
+    public List<String> getProcessorNames() {
+        return Collections.unmodifiableList(processorNames);
+    }
+
+    public List<String> getControllerServiceNames() {
+        return Collections.unmodifiableList(controllerServiceNames);
+    }
+
+    public List<String> getReportingTaskNames() {
+        return Collections.unmodifiableList(reportingTaskNames);
+    }
+
+    public List<String> getAllExtensionNames() {
+        final List<String> extensionNames = new ArrayList<>();
+        extensionNames.addAll(processorNames);
+        extensionNames.addAll(controllerServiceNames);
+        extensionNames.addAll(reportingTaskNames);
+        return extensionNames;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoader.java
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoader.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoader.java
new file mode 100644
index 0000000..f3be55b
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoader.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.nar;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+/**
+ * <p>
+ * A <tt>ClassLoader</tt> for loading NARs (NiFi archives). NARs are designed to
+ * allow isolating bundles of code (comprising one-or-more NiFi
+ * <tt>FlowFileProcessor</tt>s, <tt>FlowFileComparator</tt>s and their
+ * dependencies) from other such bundles; this allows for dependencies and
+ * processors that require conflicting, incompatible versions of the same
+ * dependency to run in a single instance of NiFi.</p>
+ *
+ * <p>
+ * <tt>NarClassLoader</tt> follows the delegation model described in
+ * {@link ClassLoader#findClass(String) ClassLoader.findClass(...)};
+ * classes are first loaded from the parent <tt>ClassLoader</tt>, and only if
+ * they cannot be found there does the <tt>NarClassLoader</tt> provide a
+ * definition. Specifically, this means that resources are loaded from NiFi's
+ * <tt>conf</tt>
+ * and <tt>lib</tt> directories first, and if they cannot be found there, are
+ * loaded from the NAR.</p>
+ *
+ * <p>
+ * The packaging of a NAR is such that it is a ZIP file with the following
+ * directory structure:
+ *
+ * <pre>
+ *   +META-INF/
+ *   +-- bundled-dependencies/
+ *   +-- &lt;JAR files&gt;
+ *   +-- MANIFEST.MF
+ * </pre>
+ * </p>
+ *
+ * <p>
+ * The MANIFEST.MF file contains the same information as a typical JAR file but
+ * also includes two additional NiFi properties: {@code Nar-Id} and
+ * {@code Nar-Dependency-Id}.
+ * </p>
+ *
+ * <p>
+ * The {@code Nar-Id} provides a unique identifier for this NAR.
+ * </p>
+ *
+ * <p>
+ * The {@code Nar-Dependency-Id} is optional. If provided, it indicates that
+ * this NAR should inherit all of the dependencies of the NAR with the provided
+ * ID. Often times, the NAR that is depended upon is referred to as the Parent.
+ * This is because its ClassLoader will be the parent ClassLoader of the
+ * dependent NAR.
+ * </p>
+ *
+ * <p>
+ * If a NAR is built using NiFi's Maven NAR Plugin, the {@code Nar-Id} property
+ * will be set to the artifactId of the NAR. The {@code Nar-Dependency-Id} will
+ * be set to the artifactId of the NAR that is depended upon. For example, if
+ * NAR A is defined as such:
+ *
+ * <pre>
+ * ...
+ * &lt;artifactId&gt;nar-a&lt;/artifactId&gt;
+ * &lt;packaging&gt;nar&lt;/packaging&gt;
+ * ...
+ * &lt;dependencies&gt;
+ *   &lt;dependency&gt;
+ *     &lt;groupId&gt;group&lt;/groupId&gt;
+ *     &lt;artifactId&gt;nar-z&lt;/artifactId&gt;
+ *     <b>&lt;type&gt;nar&lt;/type&gt;</b>
+ *   &lt;/dependency&gt;
+ * &lt;/dependencies&gt;
+ * </pre>
+ * </p>
+ *
+ *
+ * <p>
+ * Then the MANIFEST.MF file that is created for NAR A will have the following
+ * properties set:
+ * <ul>
+ * <li>{@code Nar-Id: nar-a}</li>
+ * <li>{@code Nar-Dependency-Id: nar-z}</li>
+ * </ul>
+ * </p>
+ *
+ * <p>
+ * Note, above, that the {@code type} of the dependency is set to {@code nar}.
+ * </p>
+ *
+ * <p>
+ * If the NAR has more than one dependency of {@code type} {@code nar}, then the
+ * Maven NAR plugin will fail to build the NAR.
+ * </p>
+ */
+public class NarClassLoader extends URLClassLoader {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(NarClassLoader.class);
+
+    private static final FileFilter JAR_FILTER = new FileFilter() {
+        @Override
+        public boolean accept(File pathname) {
+            final String nameToTest = pathname.getName().toLowerCase();
+            return nameToTest.endsWith(".jar") && pathname.isFile();
+        }
+    };
+
+    /**
+     * The NAR for which this <tt>ClassLoader</tt> is responsible.
+     */
+    private final File narWorkingDirectory;
+
+    /**
+     * Construct a nar class loader.
+     *
+     * @param narWorkingDirectory directory to explode nar contents to
+     * @throws IllegalArgumentException if the NAR is missing the Java Services
+     * API file for <tt>FlowFileProcessor</tt> implementations.
+     * @throws ClassNotFoundException if any of the <tt>FlowFileProcessor</tt>
+     * implementations defined by the Java Services API cannot be loaded.
+     * @throws IOException if an error occurs while loading the NAR.
+     */
+    public NarClassLoader(final File narWorkingDirectory) throws ClassNotFoundException, IOException {
+        super(new URL[0]);
+        this.narWorkingDirectory = narWorkingDirectory;
+
+        // process the classpath
+        updateClasspath(narWorkingDirectory);
+    }
+
+    /**
+     * Construct a nar class loader with the specific parent.
+     *
+     * @param narWorkingDirectory directory to explode nar contents to
+     * @param parentClassLoader parent class loader of this nar
+     * @throws IllegalArgumentException if the NAR is missing the Java Services
+     * API file for <tt>FlowFileProcessor</tt> implementations.
+     * @throws ClassNotFoundException if any of the <tt>FlowFileProcessor</tt>
+     * implementations defined by the Java Services API cannot be loaded.
+     * @throws IOException if an error occurs while loading the NAR.
+     */
+    public NarClassLoader(final File narWorkingDirectory, final ClassLoader parentClassLoader) throws ClassNotFoundException, IOException {
+        super(new URL[0], parentClassLoader);
+        this.narWorkingDirectory = narWorkingDirectory;
+
+        // process the classpath
+        updateClasspath(narWorkingDirectory);
+    }
+
+    public File getWorkingDirectory() {
+        return narWorkingDirectory;
+    }
+
+    /**
+     * Adds URLs for the resources unpacked from this NAR:
+     * <ul><li>the root: for classes, <tt>META-INF</tt>, etc.</li>
+     * <li><tt>META-INF/dependencies</tt>: for config files, <tt>.so</tt>s,
+     * etc.</li>
+     * <li><tt>META-INF/dependencies/*.jar</tt>: for dependent
+     * libraries</li></ul>
+     *
+     * @param root the root directory of the unpacked NAR.
+     * @throws IOException if the URL list could not be updated.
+     */
+    private void updateClasspath(File root) throws IOException {
+        addURL(root.toURI().toURL()); // for compiled classes, META-INF/, etc.
+
+        File dependencies = new File(root, "META-INF/bundled-dependencies");
+        if (!dependencies.isDirectory()) {
+            LOGGER.warn(narWorkingDirectory + " does not contain META-INF/bundled-dependencies!");
+        }
+        addURL(dependencies.toURI().toURL());
+        if (dependencies.isDirectory()) {
+            for (File libJar : dependencies.listFiles(JAR_FILTER)) {
+                addURL(libJar.toURI().toURL());
+            }
+        }
+    }
+
+    @Override
+    protected String findLibrary(final String libname) {
+        File dependencies = new File(narWorkingDirectory, "META-INF/bundled-dependencies");
+        if (!dependencies.isDirectory()) {
+            LOGGER.warn(narWorkingDirectory + " does not contain META-INF/bundled-dependencies!");
+        }
+
+        final File nativeDir = new File(dependencies, "native");
+        final File libsoFile = new File(nativeDir, "lib" + libname + ".so");
+        final File dllFile = new File(nativeDir, libname + ".dll");
+        final File soFile = new File(nativeDir, libname + ".so");
+        if (libsoFile.exists()) {
+            return libsoFile.getAbsolutePath();
+        } else if (dllFile.exists()) {
+            return dllFile.getAbsolutePath();
+        } else if (soFile.exists()) {
+            return soFile.getAbsolutePath();
+        }
+
+        // not found in the nar. try system native dir
+        return null;
+    }
+
+    @Override
+    public String toString() {
+        return NarClassLoader.class.getName() + "[" + narWorkingDirectory.getPath() + "]";
+    }
+}