You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/07/04 16:55:17 UTC
[3/6] nifi-minifi git commit: MINIFI-38 Removing reliance on
JettyServer in order to add a flow status reporting end point. This also
removes the UI and adds a 'flowstatus' option to minifi.sh to get information
on the current flow from the terminal.
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/status/StatusRequestParser.java
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/status/StatusRequestParser.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/status/StatusRequestParser.java
new file mode 100644
index 0000000..061120c
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/status/StatusRequestParser.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.status;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.diagnostics.GarbageCollection;
+import org.apache.nifi.diagnostics.StorageUsage;
+import org.apache.nifi.diagnostics.SystemDiagnostics;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.minifi.commons.status.common.BulletinStatus;
+import org.apache.nifi.minifi.commons.status.common.ValidationError;
+import org.apache.nifi.minifi.commons.status.connection.ConnectionHealth;
+import org.apache.nifi.minifi.commons.status.connection.ConnectionStats;
+import org.apache.nifi.minifi.commons.status.connection.ConnectionStatusBean;
+import org.apache.nifi.minifi.commons.status.controllerservice.ControllerServiceHealth;
+import org.apache.nifi.minifi.commons.status.controllerservice.ControllerServiceStatus;
+import org.apache.nifi.minifi.commons.status.instance.InstanceHealth;
+import org.apache.nifi.minifi.commons.status.instance.InstanceStats;
+import org.apache.nifi.minifi.commons.status.instance.InstanceStatus;
+import org.apache.nifi.minifi.commons.status.processor.ProcessorHealth;
+import org.apache.nifi.minifi.commons.status.processor.ProcessorStats;
+import org.apache.nifi.minifi.commons.status.processor.ProcessorStatusBean;
+import org.apache.nifi.minifi.commons.status.reportingTask.ReportingTaskHealth;
+import org.apache.nifi.minifi.commons.status.reportingTask.ReportingTaskStatus;
+import org.apache.nifi.minifi.commons.status.rpg.InputPortStatus;
+import org.apache.nifi.minifi.commons.status.rpg.RemoteProcessGroupHealth;
+import org.apache.nifi.minifi.commons.status.rpg.RemoteProcessGroupStats;
+import org.apache.nifi.minifi.commons.status.rpg.RemoteProcessGroupStatusBean;
+import org.apache.nifi.minifi.commons.status.system.ContentRepositoryUsage;
+import org.apache.nifi.minifi.commons.status.system.FlowfileRepositoryUsage;
+import org.apache.nifi.minifi.commons.status.system.GarbageCollectionStatus;
+import org.apache.nifi.minifi.commons.status.system.HeapStatus;
+import org.apache.nifi.minifi.commons.status.system.SystemDiagnosticsStatus;
+import org.apache.nifi.minifi.commons.status.system.SystemProcessorStats;
+import org.apache.nifi.remote.RemoteGroupPort;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinQuery;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public final class StatusRequestParser {
+ private StatusRequestParser() {
+ }
+
+ static ProcessorStatusBean parseProcessorStatusRequest(ProcessorStatus inputProcessorStatus, String statusTypes, FlowController flowController, Collection<ValidationResult> validationResults) {
+ ProcessorStatusBean processorStatusBean = new ProcessorStatusBean();
+ processorStatusBean.setName(inputProcessorStatus.getName());
+
+ String[] statusSplits = statusTypes.split(",");
+ List<Bulletin> bulletinList = flowController.getBulletinRepository().findBulletins(
+ new BulletinQuery.Builder()
+ .sourceIdMatches(inputProcessorStatus.getId())
+ .build());
+
+ for (String statusType : statusSplits) {
+ switch (statusType.toLowerCase().trim()) {
+ case "health":
+ ProcessorHealth processorHealth = new ProcessorHealth();
+
+ processorHealth.setRunStatus(inputProcessorStatus.getRunStatus().name());
+ processorHealth.setHasBulletins(!bulletinList.isEmpty());
+ processorHealth.setValidationErrorList(transformValidationResults(validationResults));
+
+ processorStatusBean.setProcessorHealth(processorHealth);
+ break;
+ case "bulletins":
+ processorStatusBean.setBulletinList(transformBulletins(bulletinList));
+ break;
+ case "stats":
+ ProcessorStats processorStats = new ProcessorStats();
+
+ processorStats.setActiveThreads(inputProcessorStatus.getActiveThreadCount());
+ processorStats.setFlowfilesReceived(inputProcessorStatus.getFlowFilesReceived());
+ processorStats.setBytesRead(inputProcessorStatus.getBytesRead());
+ processorStats.setBytesWritten(inputProcessorStatus.getBytesWritten());
+ processorStats.setFlowfilesSent(inputProcessorStatus.getFlowFilesSent());
+ processorStats.setInvocations(inputProcessorStatus.getInvocations());
+ processorStats.setProcessingNanos(inputProcessorStatus.getProcessingNanos());
+
+ processorStatusBean.setProcessorStats(processorStats);
+ break;
+ }
+ }
+ return processorStatusBean;
+ }
+
+ static RemoteProcessGroupStatusBean parseRemoteProcessGroupStatusRequest(RemoteProcessGroupStatus inputRemoteProcessGroupStatus, String statusTypes, FlowController flowController) {
+ RemoteProcessGroupStatusBean remoteProcessGroupStatusBean = new RemoteProcessGroupStatusBean();
+ remoteProcessGroupStatusBean.setName(inputRemoteProcessGroupStatus.getName());
+
+ String rootGroupId = flowController.getRootGroupId();
+ String[] statusSplits = statusTypes.split(",");
+
+ List<Bulletin> bulletinList = flowController.getBulletinRepository().findBulletins(
+ new BulletinQuery.Builder()
+ .sourceIdMatches(inputRemoteProcessGroupStatus.getId())
+ .build());
+ List<String> authorizationIssues = inputRemoteProcessGroupStatus.getAuthorizationIssues();
+
+ for (String statusType : statusSplits) {
+ switch (statusType.toLowerCase().trim()) {
+ case "health":
+ RemoteProcessGroupHealth remoteProcessGroupHealth = new RemoteProcessGroupHealth();
+
+ remoteProcessGroupHealth.setTransmissionStatus(inputRemoteProcessGroupStatus.getTransmissionStatus().name());
+ remoteProcessGroupHealth.setHasAuthorizationIssues(!authorizationIssues.isEmpty());
+ remoteProcessGroupHealth.setActivePortCount(inputRemoteProcessGroupStatus.getActiveRemotePortCount());
+ remoteProcessGroupHealth.setInactivePortCount(inputRemoteProcessGroupStatus.getInactiveRemotePortCount());
+ remoteProcessGroupHealth.setHasBulletins(!bulletinList.isEmpty());
+
+ remoteProcessGroupStatusBean.setRemoteProcessGroupHealth(remoteProcessGroupHealth);
+ break;
+ case "bulletins":
+ remoteProcessGroupStatusBean.setBulletinList(transformBulletins(bulletinList));
+ break;
+ case "authorizationissues":
+ remoteProcessGroupStatusBean.setAuthorizationIssues(authorizationIssues);
+ break;
+ case "inputports":
+ List<InputPortStatus> inputPortStatusList = new LinkedList<>();
+ RemoteProcessGroup remoteProcessGroup = flowController.getGroup(rootGroupId).getRemoteProcessGroup(inputRemoteProcessGroupStatus.getId());
+ Set<RemoteGroupPort> inputPorts = remoteProcessGroup.getInputPorts();
+
+ for (RemoteGroupPort inputPort : inputPorts) {
+ InputPortStatus inputPortStatus = new InputPortStatus();
+
+ inputPortStatus.setName(inputPort.getName());
+ inputPortStatus.setTargetExists(inputPort.getTargetExists());
+ inputPortStatus.setTargetRunning(inputPort.isTargetRunning());
+
+ inputPortStatusList.add(inputPortStatus);
+ }
+ remoteProcessGroupStatusBean.setInputPortStatusList(inputPortStatusList);
+ break;
+ case "stats":
+ RemoteProcessGroupStats remoteProcessGroupStats = new RemoteProcessGroupStats();
+
+ remoteProcessGroupStats.setActiveThreads(inputRemoteProcessGroupStatus.getActiveThreadCount());
+ remoteProcessGroupStats.setSentContentSize(inputRemoteProcessGroupStatus.getSentContentSize());
+ remoteProcessGroupStats.setSentCount(inputRemoteProcessGroupStatus.getSentCount());
+
+ remoteProcessGroupStatusBean.setRemoteProcessGroupStats(remoteProcessGroupStats);
+ break;
+ }
+ }
+ return remoteProcessGroupStatusBean;
+ }
+
+ static ConnectionStatusBean parseConnectionStatusRequest(ConnectionStatus inputConnectionStatus, String statusTypes, Logger logger) {
+ ConnectionStatusBean connectionStatusBean = new ConnectionStatusBean();
+ connectionStatusBean.setName(inputConnectionStatus.getName());
+
+ String[] statusSplits = statusTypes.split(",");
+ for (String statusType : statusSplits) {
+ switch (statusType.toLowerCase().trim()) {
+ case "health":
+ ConnectionHealth connectionHealth = new ConnectionHealth();
+
+ connectionHealth.setQueuedBytes(inputConnectionStatus.getQueuedBytes());
+ connectionHealth.setQueuedCount(inputConnectionStatus.getQueuedCount());
+
+ connectionStatusBean.setConnectionHealth(connectionHealth);
+ break;
+ case "stats":
+ ConnectionStats connectionStats = new ConnectionStats();
+
+ connectionStats.setInputBytes(inputConnectionStatus.getInputBytes());
+ connectionStats.setInputCount(inputConnectionStatus.getInputCount());
+ connectionStats.setOutputCount(inputConnectionStatus.getOutputCount());
+ connectionStats.setOutputBytes(inputConnectionStatus.getOutputBytes());
+
+ connectionStatusBean.setConnectionStats(connectionStats);
+ break;
+ }
+ }
+ return connectionStatusBean;
+ }
+
+ static ReportingTaskStatus parseReportingTaskStatusRequest(String id, ReportingTaskNode reportingTaskNode, String statusTypes, FlowController flowController, Logger logger) {
+ ReportingTaskStatus reportingTaskStatus = new ReportingTaskStatus();
+ reportingTaskStatus.setName(id);
+
+ String[] statusSplits = statusTypes.split(",");
+ List<Bulletin> bulletinList = flowController.getBulletinRepository().findBulletins(
+ new BulletinQuery.Builder()
+ .sourceIdMatches(id)
+ .build());
+ for (String statusType : statusSplits) {
+ switch (statusType.toLowerCase().trim()) {
+ case "health":
+ ReportingTaskHealth reportingTaskHealth = new ReportingTaskHealth();
+
+ reportingTaskHealth.setScheduledState(reportingTaskNode.getScheduledState().name());
+ reportingTaskHealth.setActiveThreads(reportingTaskNode.getActiveThreadCount());
+ reportingTaskHealth.setHasBulletins(!bulletinList.isEmpty());
+
+ Collection<ValidationResult> validationResults = reportingTaskNode.getValidationErrors();
+ reportingTaskHealth.setValidationErrorList(transformValidationResults(validationResults));
+
+ reportingTaskStatus.setReportingTaskHealth(reportingTaskHealth);
+ break;
+ case "bulletins":
+ reportingTaskStatus.setBulletinList(transformBulletins(bulletinList));
+ break;
+ }
+ }
+ return reportingTaskStatus;
+ }
+
+ static ControllerServiceStatus parseControllerServiceStatusRequest(ControllerServiceNode controllerServiceNode, String statusTypes, FlowController flowController, Logger logger) {
+ ControllerServiceStatus controllerServiceStatus = new ControllerServiceStatus();
+ String id = controllerServiceNode.getIdentifier();
+ controllerServiceStatus.setName(id);
+
+ String[] statusSplits = statusTypes.split(",");
+ List<Bulletin> bulletinList = flowController.getBulletinRepository().findBulletins(
+ new BulletinQuery.Builder()
+ .sourceIdMatches(id)
+ .build());
+ for (String statusType : statusSplits) {
+ switch (statusType.toLowerCase().trim()) {
+ case "health":
+ ControllerServiceHealth controllerServiceHealth = new ControllerServiceHealth();
+
+ controllerServiceHealth.setState(controllerServiceNode.getState().name());
+ controllerServiceHealth.setHasBulletins(!bulletinList.isEmpty());
+
+ Collection<ValidationResult> validationResults = controllerServiceNode.getValidationErrors();
+ controllerServiceHealth.setValidationErrorList(transformValidationResults(validationResults));
+
+ controllerServiceStatus.setControllerServiceHealth(controllerServiceHealth);
+ break;
+ case "bulletins":
+ controllerServiceStatus.setBulletinList(transformBulletins(bulletinList));
+ break;
+ }
+ }
+ return controllerServiceStatus;
+ }
+
+ static SystemDiagnosticsStatus parseSystemDiagnosticsRequest(SystemDiagnostics inputSystemDiagnostics, String statusTypes) throws StatusRequestException {
+ if (inputSystemDiagnostics == null) {
+ throw new StatusRequestException("Unable to get system diagnostics");
+ }
+
+ SystemDiagnosticsStatus systemDiagnosticsStatus = new SystemDiagnosticsStatus();
+ String[] statusSplits = statusTypes.split(",");
+
+ for (String statusType : statusSplits) {
+ switch (statusType.toLowerCase().trim()) {
+ case "heap":
+ HeapStatus heapStatus = new HeapStatus();
+ heapStatus.setTotalHeap(inputSystemDiagnostics.getTotalHeap());
+ heapStatus.setMaxHeap(inputSystemDiagnostics.getMaxHeap());
+ heapStatus.setFreeHeap(inputSystemDiagnostics.getFreeHeap());
+ heapStatus.setUsedHeap(inputSystemDiagnostics.getUsedHeap());
+ heapStatus.setHeapUtilization(inputSystemDiagnostics.getHeapUtilization());
+ heapStatus.setTotalNonHeap(inputSystemDiagnostics.getTotalNonHeap());
+ heapStatus.setMaxNonHeap(inputSystemDiagnostics.getMaxNonHeap());
+ heapStatus.setFreeNonHeap(inputSystemDiagnostics.getFreeNonHeap());
+ heapStatus.setUsedNonHeap(inputSystemDiagnostics.getUsedNonHeap());
+ heapStatus.setNonHeapUtilization(inputSystemDiagnostics.getNonHeapUtilization());
+ systemDiagnosticsStatus.setHeapStatus(heapStatus);
+ break;
+ case "processorstats":
+ SystemProcessorStats systemProcessorStats = new SystemProcessorStats();
+ systemProcessorStats.setAvailableProcessors(inputSystemDiagnostics.getAvailableProcessors());
+ systemProcessorStats.setLoadAverage(inputSystemDiagnostics.getProcessorLoadAverage());
+ systemDiagnosticsStatus.setProcessorStatus(systemProcessorStats);
+ break;
+ case "contentrepositoryusage":
+ List<ContentRepositoryUsage> contentRepositoryUsageList = new LinkedList<>();
+ Map<String, StorageUsage> contentRepoStorage = inputSystemDiagnostics.getContentRepositoryStorageUsage();
+
+ for (Map.Entry<String, StorageUsage> stringStorageUsageEntry : contentRepoStorage.entrySet()) {
+ ContentRepositoryUsage contentRepositoryUsage = new ContentRepositoryUsage();
+ StorageUsage storageUsage = stringStorageUsageEntry.getValue();
+
+ contentRepositoryUsage.setName(storageUsage.getIdentifier());
+ contentRepositoryUsage.setFreeSpace(storageUsage.getFreeSpace());
+ contentRepositoryUsage.setTotalSpace(storageUsage.getTotalSpace());
+ contentRepositoryUsage.setDiskUtilization(storageUsage.getDiskUtilization());
+ contentRepositoryUsage.setUsedSpace(storageUsage.getUsedSpace());
+
+ contentRepositoryUsageList.add(contentRepositoryUsage);
+ }
+ systemDiagnosticsStatus.setContentRepositoryUsageList(contentRepositoryUsageList);
+ break;
+ case "flowfilerepositoryusage":
+ FlowfileRepositoryUsage flowfileRepositoryUsage = new FlowfileRepositoryUsage();
+ StorageUsage flowFileRepoStorage = inputSystemDiagnostics.getFlowFileRepositoryStorageUsage();
+
+ flowfileRepositoryUsage.setFreeSpace(flowFileRepoStorage.getFreeSpace());
+ flowfileRepositoryUsage.setTotalSpace(flowFileRepoStorage.getTotalSpace());
+ flowfileRepositoryUsage.setDiskUtilization(flowFileRepoStorage.getDiskUtilization());
+ flowfileRepositoryUsage.setUsedSpace(flowFileRepoStorage.getUsedSpace());
+
+ systemDiagnosticsStatus.setFlowfileRepositoryUsage(flowfileRepositoryUsage);
+ break;
+ case "garbagecollection":
+ List<GarbageCollectionStatus> garbageCollectionStatusList = new LinkedList<>();
+ Map<String, GarbageCollection> garbageCollectionMap = inputSystemDiagnostics.getGarbageCollection();
+
+ for (Map.Entry<String, GarbageCollection> stringGarbageCollectionEntry : garbageCollectionMap.entrySet()) {
+ GarbageCollectionStatus garbageCollectionStatus = new GarbageCollectionStatus();
+ GarbageCollection garbageCollection = stringGarbageCollectionEntry.getValue();
+
+ garbageCollectionStatus.setName(garbageCollection.getName());
+ garbageCollectionStatus.setCollectionCount(garbageCollection.getCollectionCount());
+ garbageCollectionStatus.setCollectionTime(garbageCollection.getCollectionTime());
+
+ garbageCollectionStatusList.add(garbageCollectionStatus);
+ }
+ systemDiagnosticsStatus.setGarbageCollectionStatusList(garbageCollectionStatusList);
+ break;
+ }
+ }
+ return systemDiagnosticsStatus;
+ }
+
+ static InstanceStatus parseInstanceRequest(String statusTypes, FlowController flowController, ProcessGroupStatus rootGroupStatus) {
+ InstanceStatus instanceStatus = new InstanceStatus();
+
+ flowController.getAllControllerServices();
+ List<Bulletin> bulletinList = flowController.getBulletinRepository().findBulletinsForController();
+ String[] statusSplits = statusTypes.split(",");
+
+ for (String statusType : statusSplits) {
+ switch (statusType.toLowerCase().trim()) {
+ case "health":
+ InstanceHealth instanceHealth = new InstanceHealth();
+
+ instanceHealth.setQueuedCount(rootGroupStatus.getQueuedCount());
+ instanceHealth.setQueuedContentSize(rootGroupStatus.getQueuedContentSize());
+ instanceHealth.setHasBulletins(!bulletinList.isEmpty());
+ instanceHealth.setActiveThreads(rootGroupStatus.getActiveThreadCount());
+
+ instanceStatus.setInstanceHealth(instanceHealth);
+ break;
+ case "bulletins":
+ instanceStatus.setBulletinList(transformBulletins(flowController.getBulletinRepository().findBulletinsForController()));
+ break;
+ case "stats":
+ InstanceStats instanceStats = new InstanceStats();
+
+ instanceStats.setBytesRead(rootGroupStatus.getBytesRead());
+ instanceStats.setBytesWritten(rootGroupStatus.getBytesWritten());
+ instanceStats.setBytesSent(rootGroupStatus.getBytesSent());
+ instanceStats.setFlowfilesSent(rootGroupStatus.getFlowFilesSent());
+ instanceStats.setBytesTransferred(rootGroupStatus.getBytesTransferred());
+ instanceStats.setFlowfilesTransferred(rootGroupStatus.getFlowFilesTransferred());
+ instanceStats.setBytesReceived(rootGroupStatus.getBytesReceived());
+ instanceStats.setFlowfilesReceived(rootGroupStatus.getFlowFilesReceived());
+
+ instanceStatus.setInstanceStats(instanceStats);
+ break;
+ }
+ }
+ return instanceStatus;
+ }
+
+ private static List<ValidationError> transformValidationResults(Collection<ValidationResult> validationResults) {
+ List<ValidationError> validationErrorList = new LinkedList<>();
+ for (ValidationResult validationResult : validationResults) {
+ if (!validationResult.isValid()) {
+ ValidationError validationError = new ValidationError();
+ validationError.setSubject(validationResult.getSubject());
+ validationError.setInput(validationResult.getInput());
+ validationError.setReason(validationResult.getExplanation());
+
+ validationErrorList.add(validationError);
+ }
+ }
+ return validationErrorList;
+ }
+
+ private static List<BulletinStatus> transformBulletins(List<Bulletin> bulletinList) {
+ List<BulletinStatus> bulletinStatusList = new LinkedList<>();
+ if (!bulletinList.isEmpty()) {
+ for (Bulletin bulletin : bulletinList) {
+ BulletinStatus bulletinStatus = new BulletinStatus();
+ bulletinStatus.setMessage(bulletin.getMessage());
+ bulletinStatus.setTimestamp(bulletin.getTimestamp());
+ bulletinStatusList.add(bulletinStatus);
+ }
+ }
+ return bulletinStatusList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/status/TestStatusConfigReporter.java
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/status/TestStatusConfigReporter.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/status/TestStatusConfigReporter.java
new file mode 100644
index 0000000..027d685
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/status/TestStatusConfigReporter.java
@@ -0,0 +1,873 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.status;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfiguredComponent;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.RunStatus;
+import org.apache.nifi.controller.status.TransmissionStatus;
+import org.apache.nifi.diagnostics.GarbageCollection;
+import org.apache.nifi.diagnostics.StorageUsage;
+import org.apache.nifi.diagnostics.SystemDiagnostics;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.minifi.commons.status.FlowStatusReport;
+import org.apache.nifi.remote.RemoteGroupPort;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinQuery;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addConnectionStatus;
+import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addControllerServiceStatus;
+import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addExpectedRemoteProcessGroupStatus;
+import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addInstanceStatus;
+import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addProcessorStatus;
+import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addReportingTaskStatus;
+import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addSystemDiagnosticStatus;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestStatusConfigReporter {
+ private FlowController mockFlowController;
+ private ProcessGroupStatus rootGroupStatus;
+ private BulletinRepository bulletinRepo;
+ private ProcessGroup processGroup;
+
+ @Before
+ public void setup() {
+ mockFlowController = mock(FlowController.class);
+ rootGroupStatus = mock(ProcessGroupStatus.class);
+ bulletinRepo = mock(BulletinRepository.class);
+ processGroup = mock(ProcessGroup.class);
+
+ when(mockFlowController.getRootGroupId()).thenReturn("root");
+ when(mockFlowController.getGroupStatus("root")).thenReturn(rootGroupStatus);
+ when(mockFlowController.getControllerStatus()).thenReturn(rootGroupStatus);
+ when(mockFlowController.getBulletinRepository()).thenReturn(bulletinRepo);
+ when(mockFlowController.getGroup(mockFlowController.getRootGroupId())).thenReturn(processGroup);
+ }
+
+ @Test
+ public void processorStatusHealth() throws Exception {
+ populateProcessor(false, false);
+
+ String statusRequest = "processor:all:health";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addProcessorStatus(expected, true, false, false, false, false);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void processorStatusWithValidationErrors() throws Exception {
+ populateProcessor(true, false);
+
+ String statusRequest = "processor:all:health";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addProcessorStatus(expected, true, true, false, false, false);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void processorStatusAll() throws Exception {
+ populateProcessor(true, true);
+
+ String statusRequest = "processor:all:health, stats, bulletins";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addProcessorStatus(expected, true, true, true, true, true);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void connectionStatusHealth() throws Exception {
+ populateConnection();
+
+ String statusRequest = "connection:all:health";
+ FlowStatusReport status = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addConnectionStatus(expected, true, false);
+
+ assertEquals(expected, status);
+ }
+
+
+ @Test
+ public void connectionStatusAll() throws Exception {
+ populateConnection();
+
+ String statusRequest = "connection:all:health, stats";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+ addConnectionStatus(expected, true, true);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void connectionAndProcessorStatusHealth() throws Exception {
+
+ populateConnection();
+
+ populateProcessor(false, false);
+
+ String statusRequest = "connection:connectionId:health; processor:UpdateAttributeProcessorId:health";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ addConnectionStatus(expected, true, false);
+
+ addProcessorStatus(expected, true, false, false, false, false);
+
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void provenanceReportingTaskStatusHealth() throws Exception {
+ populateReportingTask(false, false);
+
+ String statusRequest = "provenanceReporting:health";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+ addReportingTaskStatus(expected, true, false, false, false);
+
+ assertEquals(expected, actual);
+ }
+
+
+ @Test
+ public void provenanceReportingTaskStatusBulletins() throws Exception {
+ populateReportingTask(true, false);
+
+ String statusRequest = "provenanceReporting:bulletins";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addReportingTaskStatus(expected, false, false, true, true);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void provenanceReportingTaskStatusAll() throws Exception {
+ populateReportingTask(true, true);
+
+ String statusRequest = "provenanceReporting:health,bulletins";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addReportingTaskStatus(expected, true, true, true, true);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void systemDiagnosticHeap() throws Exception {
+ populateSystemDiagnostics();
+
+ String statusRequest = "systemDiagnostics:heap";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addSystemDiagnosticStatus(expected, true, false, false, false, false);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void systemDiagnosticProcessorStats() throws Exception {
+ populateSystemDiagnostics();
+
+ String statusRequest = "systemDiagnostics:processorStats";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addSystemDiagnosticStatus(expected, false, true, false, false, false);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void systemDiagnosticFlowFileRepo() throws Exception {
+ populateSystemDiagnostics();
+
+ String statusRequest = "systemDiagnostics:flowfilerepositoryusage";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addSystemDiagnosticStatus(expected, false, false, true, false, false);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void systemDiagnosticContentRepo() throws Exception {
+ populateSystemDiagnostics();
+
+ String statusRequest = "systemDiagnostics:contentrepositoryusage";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addSystemDiagnosticStatus(expected, false, false, false, true, false);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void systemDiagnosticGarbageCollection() throws Exception {
+ populateSystemDiagnostics();
+
+ String statusRequest = "systemDiagnostics:garbagecollection";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addSystemDiagnosticStatus(expected, false, false, false, false, true);
+
+ assertEquals(expected, actual);
+ }
+
+
+ @Test
+ public void systemDiagnosticAll() throws Exception {
+ populateSystemDiagnostics();
+
+ String statusRequest = "systemDiagnostics:garbagecollection, heap, processorstats, contentrepositoryusage, flowfilerepositoryusage";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addSystemDiagnosticStatus(expected, true, true, true, true, true);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void instanceStatusHealth() throws Exception {
+ populateInstance(false);
+
+ String statusRequest = "instance:health";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+ addInstanceStatus(expected, true, false, false, false);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void instanceStatusBulletins() throws Exception {
+ populateInstance(true);
+
+ String statusRequest = "instance:bulletins";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addInstanceStatus(expected, false, false, true, true);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void instanceStatusStats() throws Exception {
+ populateInstance(false);
+
+ String statusRequest = "instance:stats";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addInstanceStatus(expected, false, true, false, false);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void instanceStatusAll() throws Exception {
+ populateInstance(true);
+
+ String statusRequest = "instance:stats, bulletins, health";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addInstanceStatus(expected, true, true, true, true);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void controllerServiceStatusHealth() throws Exception {
+ populateControllerService(false, false);
+
+ String statusRequest = "controllerServices:health";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addControllerServiceStatus(expected, true, false, false, false);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void controllerServiceStatusBulletins() throws Exception {
+ populateControllerService(false, true);
+
+ String statusRequest = "controllerServices:bulletins";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addControllerServiceStatus(expected, false, false, true, true);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void controllerServiceStatusAll() throws Exception {
+ populateControllerService(true, true);
+
+ String statusRequest = "controllerServices:bulletins, health";
+
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addControllerServiceStatus(expected, true, true, true, true);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void remoteProcessGroupStatusHealth() throws Exception {
+ populateRemoteProcessGroup(false, false);
+
+ String statusRequest = "remoteProcessGroup:all:health";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addExpectedRemoteProcessGroupStatus(expected, true, false, false, false, false, false);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void remoteProcessGroupStatusBulletins() throws Exception {
+ populateRemoteProcessGroup(true, false);
+
+ String statusRequest = "remoteProcessGroup:all:bulletins";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addExpectedRemoteProcessGroupStatus(expected, false, false, false, false, true, true);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void remoteProcessGroupStatusAuthorizationIssues() throws Exception {
+ populateRemoteProcessGroup(false, true);
+
+ String statusRequest = "remoteProcessGroup:all:authorizationissues";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addExpectedRemoteProcessGroupStatus(expected, false, true, false, false, false, false);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void remoteProcessGroupStatusInputPorts() throws Exception {
+ populateRemoteProcessGroup(false, false);
+
+ String statusRequest = "remoteProcessGroup:all:inputPorts";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addExpectedRemoteProcessGroupStatus(expected, false, false, true, false, false, false);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void remoteProcessGroupStatusStats() throws Exception {
+ populateRemoteProcessGroup(false, false);
+
+ String statusRequest = "remoteProcessGroup:all:stats";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addExpectedRemoteProcessGroupStatus(expected, false, false, false, true, false, false);
+
+ assertEquals(expected, actual);
+ }
+
+
+ @Test
+ public void remoteProcessGroupStatusAll() throws Exception {
+ populateRemoteProcessGroup(true, true);
+
+ String statusRequest = "remoteProcessGroup:all:health, authorizationissues, bulletins, inputPorts, stats";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addExpectedRemoteProcessGroupStatus(expected, true, true, true, true, true, true);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void statusEverything() throws Exception {
+ when(bulletinRepo.findBulletins(anyObject())).thenReturn(Collections.emptyList());
+
+ populateControllerService(true, false);
+ populateInstance(true);
+ populateSystemDiagnostics();
+ populateReportingTask(false, true);
+ populateConnection();
+ populateProcessor(true, false);
+ populateRemoteProcessGroup(false, true);
+
+ String statusRequest = "controllerServices:bulletins,health; processor:all:health,stats,bulletins; instance:bulletins,health,stats ; systemDiagnostics:garbagecollection, heap, " +
+ "processorstats, contentrepositoryusage, flowfilerepositoryusage; connection:all:health,stats; provenanceReporting:health,bulletins; remoteProcessGroup:all:health, " +
+ "authorizationissues, bulletins, inputPorts, stats";
+
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addControllerServiceStatus(expected, true, true, true, false);
+ addInstanceStatus(expected, true, true, true, true);
+ addSystemDiagnosticStatus(expected, true, true, true, true, true);
+ addReportingTaskStatus(expected, true, true, true, false);
+ addConnectionStatus(expected, true, true);
+ addProcessorStatus(expected, true, true, true, true, false);
+ addExpectedRemoteProcessGroupStatus(expected, true, true, true, true, true, false);
+
+ assertEquals(expected, actual);
+ }
+
+
+ /***************************
+ * Populator methods
+ *************************/
+
+ private void addBulletinsToInstance() {
+ Bulletin bulletin = mock(Bulletin.class);
+ when(bulletin.getTimestamp()).thenReturn(new Date(1464019245000L));
+ when(bulletin.getMessage()).thenReturn("Bulletin message");
+
+ List<Bulletin> bulletinList = new ArrayList<>();
+ bulletinList.add(bulletin);
+
+ when(bulletinRepo.findBulletinsForController()).thenReturn(bulletinList);
+ }
+
+ private void populateSystemDiagnostics() {
+ SystemDiagnostics systemDiagnostics = new SystemDiagnostics();
+ addGarbageCollectionToSystemDiagnostics(systemDiagnostics);
+ addHeapSystemDiagnostics(systemDiagnostics);
+ addContentRepoToSystemDiagnostics(systemDiagnostics);
+ addFlowFileRepoToSystemDiagnostics(systemDiagnostics);
+ addProcessorInfoToSystemDiagnostics(systemDiagnostics);
+ when(mockFlowController.getSystemDiagnostics()).thenReturn(systemDiagnostics);
+ }
+
+ private void populateControllerService(boolean validationErrors, boolean addBulletins) {
+ ControllerServiceNode controllerServiceNode = mock(ControllerServiceNode.class);
+ addControllerServiceHealth(controllerServiceNode);
+ if (validationErrors) {
+ addValidationErrors(controllerServiceNode);
+ }
+
+ if (addBulletins) {
+ addBulletins("Bulletin message", controllerServiceNode.getIdentifier());
+ }
+ HashSet<ControllerServiceNode> controllerServiceNodes = new HashSet<>();
+ controllerServiceNodes.add(controllerServiceNode);
+ when(mockFlowController.getAllControllerServices()).thenReturn(controllerServiceNodes);
+ }
+
+ private void populateInstance(boolean addBulletins) {
+ setRootGroupStatusVariables();
+ if (addBulletins) {
+ addBulletinsToInstance();
+ }
+ }
+
+ private void populateReportingTask(boolean addBulletins, boolean validationErrors) {
+ if (addBulletins) {
+ addBulletins("Bulletin message", "ReportProvenance");
+ }
+
+ ReportingTaskNode reportingTaskNode = mock(ReportingTaskNode.class);
+ addReportingTaskNodeVariables(reportingTaskNode);
+
+ HashSet<ReportingTaskNode> reportingTaskNodes = new HashSet<>();
+ reportingTaskNodes.add(reportingTaskNode);
+
+ when(mockFlowController.getAllReportingTasks()).thenReturn(reportingTaskNodes);
+
+ if (validationErrors) {
+ ValidationResult validationResult = new ValidationResult.Builder()
+ .input("input")
+ .subject("subject")
+ .explanation("is not valid")
+ .build();
+
+ ValidationResult validationResult2 = new ValidationResult.Builder()
+ .input("input2")
+ .subject("subject2")
+ .explanation("is not valid too")
+ .build();
+
+ List<ValidationResult> validationResultList = new ArrayList<>();
+ validationResultList.add(validationResult);
+ validationResultList.add(validationResult2);
+
+ when(reportingTaskNode.getValidationErrors()).thenReturn(validationResultList);
+ } else {
+ when(reportingTaskNode.getValidationErrors()).thenReturn(Collections.EMPTY_LIST);
+ }
+ }
+
+ private void populateConnection() {
+ ConnectionStatus connectionStatus = new ConnectionStatus();
+ connectionStatus.setQueuedBytes(100);
+ connectionStatus.setId("connectionId");
+ connectionStatus.setName("connectionId");
+ connectionStatus.setQueuedCount(10);
+ connectionStatus.setInputCount(1);
+ connectionStatus.setInputBytes(2);
+ connectionStatus.setOutputCount(3);
+ connectionStatus.setOutputBytes(4);
+
+ Collection<ConnectionStatus> statusCollection = new ArrayList<>();
+ statusCollection.add(connectionStatus);
+
+ when(rootGroupStatus.getConnectionStatus()).thenReturn(statusCollection);
+ }
+
+ private void populateProcessor(boolean validationErrors, boolean addBulletins) {
+ if (addBulletins) {
+ addBulletins("Bulletin message", "UpdateAttributeProcessorId");
+ }
+
+ ProcessorStatus processorStatus = new ProcessorStatus();
+ processorStatus.setType("org.apache.nifi.processors.attributes.UpdateAttribute");
+ processorStatus.setId("UpdateAttributeProcessorId");
+ processorStatus.setName("UpdateAttributeProcessorId");
+ processorStatus.setRunStatus(RunStatus.Stopped);
+ processorStatus.setActiveThreadCount(1);
+ processorStatus.setFlowFilesReceived(2);
+ processorStatus.setBytesRead(3);
+ processorStatus.setBytesWritten(4);
+ processorStatus.setFlowFilesSent(5);
+ processorStatus.setInvocations(6);
+ processorStatus.setProcessingNanos(7);
+
+ Collection<ProcessorStatus> statusCollection = new ArrayList<>();
+ statusCollection.add(processorStatus);
+
+ mockProcessorEmptyValidation(processorStatus.getId(), processGroup);
+ when(rootGroupStatus.getProcessorStatus()).thenReturn(statusCollection);
+
+ ProcessorNode processorNode = mock(ProcessorNode.class);
+ when(processGroup.getProcessor(processorStatus.getId())).thenReturn(processorNode);
+
+ if (validationErrors) {
+ ValidationResult validationResult = new ValidationResult.Builder()
+ .input("input")
+ .subject("subject")
+ .explanation("is not valid")
+ .build();
+
+ ValidationResult validationResult2 = new ValidationResult.Builder()
+ .input("input2")
+ .subject("subject2")
+ .explanation("is not valid too")
+ .build();
+
+ List<ValidationResult> validationResultList = new ArrayList<>();
+ validationResultList.add(validationResult);
+ validationResultList.add(validationResult2);
+
+ when(processorNode.getValidationErrors()).thenReturn(validationResultList);
+ } else {
+ when(processorNode.getValidationErrors()).thenReturn(Collections.EMPTY_LIST);
+ }
+ }
+
+ private void populateRemoteProcessGroup(boolean addBulletins, boolean addAuthIssues) {
+ when(mockFlowController.getGroup(mockFlowController.getRootGroupId())).thenReturn(processGroup);
+
+ RemoteProcessGroup remoteProcessGroup = mock(RemoteProcessGroup.class);
+ when(processGroup.getRemoteProcessGroup(any())).thenReturn(remoteProcessGroup);
+
+ RemoteGroupPort remoteGroupPort = mock(RemoteGroupPort.class);
+ when(remoteGroupPort.getName()).thenReturn("inputPort");
+ when(remoteGroupPort.getTargetExists()).thenReturn(true);
+ when(remoteGroupPort.isTargetRunning()).thenReturn(false);
+
+ when(remoteProcessGroup.getInputPorts()).thenReturn(Collections.singleton(remoteGroupPort));
+
+ RemoteProcessGroupStatus remoteProcessGroupStatus = new RemoteProcessGroupStatus();
+ addRemoteProcessGroupStatus(remoteProcessGroupStatus);
+ if (addAuthIssues) {
+ remoteProcessGroupStatus.setAuthorizationIssues(Collections.singletonList("auth issue"));
+ } else {
+ remoteProcessGroupStatus.setAuthorizationIssues(Collections.EMPTY_LIST);
+ }
+ if (addBulletins) {
+ addBulletins("Bulletin message", remoteProcessGroupStatus.getId());
+ }
+ when(rootGroupStatus.getRemoteProcessGroupStatus()).thenReturn(Collections.singletonList(remoteProcessGroupStatus));
+ }
+
+
+ private void setRootGroupStatusVariables() {
+ when(rootGroupStatus.getQueuedContentSize()).thenReturn(1L);
+ when(rootGroupStatus.getQueuedCount()).thenReturn(2);
+ when(rootGroupStatus.getActiveThreadCount()).thenReturn(3);
+ when(rootGroupStatus.getBytesRead()).thenReturn(1L);
+ when(rootGroupStatus.getBytesWritten()).thenReturn(2L);
+ when(rootGroupStatus.getBytesSent()).thenReturn(3L);
+ when(rootGroupStatus.getFlowFilesSent()).thenReturn(4);
+ when(rootGroupStatus.getBytesTransferred()).thenReturn(5L);
+ when(rootGroupStatus.getFlowFilesTransferred()).thenReturn(6);
+ when(rootGroupStatus.getBytesReceived()).thenReturn(7L);
+ when(rootGroupStatus.getFlowFilesReceived()).thenReturn(8);
+ }
+
+ private void addGarbageCollectionToSystemDiagnostics(SystemDiagnostics systemDiagnostics) {
+ Map<String, GarbageCollection> garbageCollectionMap = new HashMap<>();
+
+ GarbageCollection garbageCollection1 = new GarbageCollection();
+ garbageCollection1.setCollectionCount(1);
+ garbageCollection1.setCollectionTime(10);
+ garbageCollection1.setName("garbage 1");
+ garbageCollectionMap.put(garbageCollection1.getName(), garbageCollection1);
+
+ systemDiagnostics.setGarbageCollection(garbageCollectionMap);
+ }
+
+ private void addContentRepoToSystemDiagnostics(SystemDiagnostics systemDiagnostics) {
+ Map<String, StorageUsage> stringStorageUsageMap = new HashMap<>();
+
+ StorageUsage repoUsage1 = new StorageUsage();
+ repoUsage1.setFreeSpace(30);
+ repoUsage1.setTotalSpace(100);
+ repoUsage1.setIdentifier("Content repo1");
+ stringStorageUsageMap.put(repoUsage1.getIdentifier(), repoUsage1);
+
+ systemDiagnostics.setContentRepositoryStorageUsage(stringStorageUsageMap);
+ }
+
+ private void addFlowFileRepoToSystemDiagnostics(SystemDiagnostics systemDiagnostics) {
+ StorageUsage repoUsage = new StorageUsage();
+ repoUsage.setFreeSpace(30);
+ repoUsage.setTotalSpace(100);
+ repoUsage.setIdentifier("FlowFile repo");
+ systemDiagnostics.setFlowFileRepositoryStorageUsage(repoUsage);
+ }
+
+ private void addHeapSystemDiagnostics(SystemDiagnostics systemDiagnostics) {
+ systemDiagnostics.setMaxHeap(5);
+ systemDiagnostics.setTotalHeap(3);
+ systemDiagnostics.setUsedHeap(2);
+ systemDiagnostics.setMaxNonHeap(9);
+ systemDiagnostics.setTotalNonHeap(8);
+ systemDiagnostics.setUsedNonHeap(6);
+ }
+
+ private void addProcessorInfoToSystemDiagnostics(SystemDiagnostics systemDiagnostics) {
+ systemDiagnostics.setProcessorLoadAverage(80.9);
+ systemDiagnostics.setAvailableProcessors(5);
+ }
+
+ private void mockProcessorEmptyValidation(String id, ProcessGroup processGroup) {
+ ProcessorNode processorNode = mock(ProcessorNode.class);
+ when(processGroup.getProcessor(id)).thenReturn(processorNode);
+ when(processorNode.getValidationErrors()).thenReturn(Collections.emptyList());
+ }
+
+ private void addControllerServiceHealth(ControllerServiceNode controllerServiceNode) {
+ when(controllerServiceNode.getName()).thenReturn("mockControllerService");
+ when(controllerServiceNode.getIdentifier()).thenReturn("mockControllerService");
+ when(controllerServiceNode.getState()).thenReturn(ControllerServiceState.ENABLED);
+ when(controllerServiceNode.getValidationErrors()).thenReturn(Collections.emptyList());
+ }
+
+ private void addReportingTaskNodeVariables(ReportingTaskNode reportingTaskNode) {
+ when(reportingTaskNode.getValidationErrors()).thenReturn(Collections.emptyList());
+ when(reportingTaskNode.getActiveThreadCount()).thenReturn(1);
+ when(reportingTaskNode.getScheduledState()).thenReturn(ScheduledState.RUNNING);
+ when(reportingTaskNode.getIdentifier()).thenReturn("ReportProvenance");
+ when(reportingTaskNode.getName()).thenReturn("ReportProvenance");
+
+ }
+
+ private void addRemoteProcessGroupStatus(RemoteProcessGroupStatus remoteProcessGroupStatus) {
+ remoteProcessGroupStatus.setName("rpg1");
+ remoteProcessGroupStatus.setId("rpg1");
+ remoteProcessGroupStatus.setTransmissionStatus(TransmissionStatus.Transmitting);
+ remoteProcessGroupStatus.setActiveRemotePortCount(1);
+ remoteProcessGroupStatus.setInactiveRemotePortCount(2);
+
+ remoteProcessGroupStatus.setActiveThreadCount(3);
+ remoteProcessGroupStatus.setSentContentSize(4L);
+ remoteProcessGroupStatus.setSentCount(5);
+ }
+
+ private void addBulletins(String message, String sourceId) {
+ Bulletin bulletin = mock(Bulletin.class);
+ when(bulletin.getTimestamp()).thenReturn(new Date(1464019245000L));
+ when(bulletin.getMessage()).thenReturn(message);
+
+ List<Bulletin> bulletinList = new ArrayList<>();
+ bulletinList.add(bulletin);
+
+ BulletinQueryAnswer bulletinQueryAnswer = new BulletinQueryAnswer(sourceId, bulletinList);
+ when(bulletinRepo.findBulletins(anyObject())).then(bulletinQueryAnswer);
+ }
+
+ private void addValidationErrors(ConfiguredComponent connectable) {
+ ValidationResult validationResult = new ValidationResult.Builder()
+ .input("input")
+ .subject("subject")
+ .explanation("is not valid")
+ .build();
+
+ ValidationResult validationResult2 = new ValidationResult.Builder()
+ .input("input2")
+ .subject("subject2")
+ .explanation("is not valid too")
+ .build();
+
+ List<ValidationResult> validationResultList = new ArrayList<>();
+ validationResultList.add(validationResult);
+ validationResultList.add(validationResult2);
+ when(connectable.getValidationErrors()).thenReturn(validationResultList);
+ }
+
+ private class BulletinQueryAnswer implements Answer {
+
+ final List<Bulletin> bulletinList;
+ String idToMatch = "";
+
+ private BulletinQueryAnswer(String idToMatch, List<Bulletin> bulletinList) {
+ this.idToMatch = idToMatch;
+ this.bulletinList = bulletinList;
+ }
+
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ BulletinQuery bulletinQuery = (BulletinQuery) invocationOnMock.getArguments()[0];
+ if (idToMatch.equals(bulletinQuery.getSourceIdPattern().toString())) {
+ return bulletinList;
+ }
+ return Collections.emptyList();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/pom.xml b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/pom.xml
new file mode 100644
index 0000000..c8b497b
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>minifi-framework</artifactId>
+ <groupId>org.apache.nifi.minifi</groupId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>minifi-nar-utils</artifactId>
+
+ <packaging>jar</packaging>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-properties</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java
new file mode 100644
index 0000000..db0b35e
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.nar;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.Set;
+import org.apache.nifi.authentication.LoginIdentityProvider;
+
+import org.apache.nifi.authorization.AuthorityProvider;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.status.history.ComponentStatusRepository;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.reporting.ReportingTask;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Scans through the classpath to load all FlowFileProcessors, FlowFileComparators, and ReportingTasks using the service provider API and running through all classloaders (root, NARs).
+ *
+ * @ThreadSafe - is immutable
+ */
+@SuppressWarnings("rawtypes")
+public class ExtensionManager {
+
+ private static final Logger logger = LoggerFactory.getLogger(ExtensionManager.class);
+
+ // Maps a service definition (interface) to those classes that implement the interface
+ private static final Map<Class, Set<Class>> definitionMap = new HashMap<>();
+
+ private static final Map<String, ClassLoader> extensionClassloaderLookup = new HashMap<>();
+
+ static {
+ definitionMap.put(Processor.class, new HashSet<Class>());
+ definitionMap.put(FlowFilePrioritizer.class, new HashSet<Class>());
+ definitionMap.put(ReportingTask.class, new HashSet<Class>());
+ definitionMap.put(ControllerService.class, new HashSet<Class>());
+ definitionMap.put(AuthorityProvider.class, new HashSet<Class>());
+ definitionMap.put(LoginIdentityProvider.class, new HashSet<Class>());
+ definitionMap.put(ProvenanceEventRepository.class, new HashSet<Class>());
+ definitionMap.put(ComponentStatusRepository.class, new HashSet<Class>());
+ definitionMap.put(FlowFileRepository.class, new HashSet<Class>());
+ definitionMap.put(FlowFileSwapManager.class, new HashSet<Class>());
+ definitionMap.put(ContentRepository.class, new HashSet<Class>());
+ }
+
+ /**
+ * Loads all FlowFileProcessor, FlowFileComparator, ReportingTask class types that can be found on the bootstrap classloader and by creating classloaders for all NARs found within the classpath.
+ */
+ public static void discoverExtensions() {
+ final ClassLoader systemClassLoader = ClassLoader.getSystemClassLoader();
+
+ // get the current context class loader
+ ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader();
+
+ // consider the system class loader
+ loadExtensions(systemClassLoader);
+
+ // consider each nar class loader
+ for (final ClassLoader ncl : NarClassLoaders.getExtensionClassLoaders()) {
+
+ // Must set the context class loader to the nar classloader itself
+ // so that static initialization techniques that depend on the context class loader will work properly
+ Thread.currentThread().setContextClassLoader(ncl);
+ loadExtensions(ncl);
+ }
+
+ // restore the current context class loader if appropriate
+ if (currentContextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(currentContextClassLoader);
+ }
+ }
+
+ /**
+ * Loads extensions from the specified class loader.
+ *
+ * @param classLoader from which to load extensions
+ */
+ @SuppressWarnings("unchecked")
+ private static void loadExtensions(final ClassLoader classLoader) {
+ for (final Map.Entry<Class, Set<Class>> entry : definitionMap.entrySet()) {
+ final ServiceLoader<?> serviceLoader = ServiceLoader.load(entry.getKey(), classLoader);
+
+ for (final Object o : serviceLoader) {
+ registerServiceClass(o.getClass(), extensionClassloaderLookup, classLoader, entry.getValue());
+ }
+ }
+ }
+
+ /**
+ * Registers extension for the specified type from the specified ClassLoader.
+ *
+ * @param type the extension type
+ * @param classloaderMap mapping of classname to classloader
+ * @param classLoader the classloader being mapped to
+ * @param classes to map to this classloader but which come from its ancestors
+ */
+ private static void registerServiceClass(final Class<?> type, final Map<String, ClassLoader> classloaderMap, final ClassLoader classLoader, final Set<Class> classes) {
+ final String className = type.getName();
+ final ClassLoader registeredClassLoader = classloaderMap.get(className);
+
+ // see if this class is already registered (this should happen when the class is loaded by an ancestor of the specified classloader)
+ if (registeredClassLoader == null) {
+ classloaderMap.put(className, classLoader);
+ classes.add(type);
+ } else {
+ boolean loadedFromAncestor = false;
+
+ // determine if this class was loaded from an ancestor
+ ClassLoader ancestorClassLoader = classLoader.getParent();
+ while (ancestorClassLoader != null) {
+ if (ancestorClassLoader == registeredClassLoader) {
+ loadedFromAncestor = true;
+ break;
+ }
+ ancestorClassLoader = ancestorClassLoader.getParent();
+ }
+
+ // if this class was loaded from a non ancestor class loader, report potential unexpected behavior
+ if (!loadedFromAncestor) {
+ logger.warn("Attempt was made to load " + className + " from " + classLoader
+ + " but that class name is already loaded/registered from " + registeredClassLoader
+ + ". This may cause unpredictable behavior. Order of NARs is not guaranteed.");
+ }
+ }
+ }
+
+ /**
+ * Determines the effective classloader for classes of the given type. If returns null it indicates the given type is not known or was not detected.
+ *
+ * @param classType to lookup the classloader of
+ * @return String of fully qualified class name; null if not a detected type
+ */
+ public static ClassLoader getClassLoader(final String classType) {
+ return extensionClassloaderLookup.get(classType);
+ }
+
+ public static Set<Class> getExtensions(final Class<?> definition) {
+ final Set<Class> extensions = definitionMap.get(definition);
+ return (extensions == null) ? Collections.<Class>emptySet() : extensions;
+ }
+
+ public static void logClassLoaderMapping() {
+ final StringBuilder builder = new StringBuilder();
+
+ builder.append("Extension Type Mapping to Classloader:");
+ for (final Map.Entry<Class, Set<Class>> entry : definitionMap.entrySet()) {
+ builder.append("\n\t=== ").append(entry.getKey().getSimpleName()).append(" type || Classloader ===");
+
+ for (final Class type : entry.getValue()) {
+ builder.append("\n\t").append(type.getName()).append(" || ").append(getClassLoader(type.getName()));
+ }
+
+ builder.append("\n\t=== End ").append(entry.getKey().getSimpleName()).append(" types ===");
+ }
+
+ logger.info(builder.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionMapping.java
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionMapping.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionMapping.java
new file mode 100644
index 0000000..c478d97
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionMapping.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.nar;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+public class ExtensionMapping {
+
+ private final List<String> processorNames = new ArrayList<>();
+ private final List<String> controllerServiceNames = new ArrayList<>();
+ private final List<String> reportingTaskNames = new ArrayList<>();
+
+ void addProcessor(final String processorName) {
+ processorNames.add(processorName);
+ }
+
+ void addAllProcessors(final Collection<String> processorNames) {
+ this.processorNames.addAll(processorNames);
+ }
+
+ void addControllerService(final String controllerServiceName) {
+ controllerServiceNames.add(controllerServiceName);
+ }
+
+ void addAllControllerServices(final Collection<String> controllerServiceNames) {
+ this.controllerServiceNames.addAll(controllerServiceNames);
+ }
+
+ void addReportingTask(final String reportingTaskName) {
+ reportingTaskNames.add(reportingTaskName);
+ }
+
+ void addAllReportingTasks(final Collection<String> reportingTaskNames) {
+ this.reportingTaskNames.addAll(reportingTaskNames);
+ }
+
+ public List<String> getProcessorNames() {
+ return Collections.unmodifiableList(processorNames);
+ }
+
+ public List<String> getControllerServiceNames() {
+ return Collections.unmodifiableList(controllerServiceNames);
+ }
+
+ public List<String> getReportingTaskNames() {
+ return Collections.unmodifiableList(reportingTaskNames);
+ }
+
+ public List<String> getAllExtensionNames() {
+ final List<String> extensionNames = new ArrayList<>();
+ extensionNames.addAll(processorNames);
+ extensionNames.addAll(controllerServiceNames);
+ extensionNames.addAll(reportingTaskNames);
+ return extensionNames;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoader.java
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoader.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoader.java
new file mode 100644
index 0000000..f3be55b
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoader.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.nar;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+/**
+ * <p>
+ * A <tt>ClassLoader</tt> for loading NARs (NiFi archives). NARs are designed to
+ * allow isolating bundles of code (comprising one-or-more NiFi
+ * <tt>FlowFileProcessor</tt>s, <tt>FlowFileComparator</tt>s and their
+ * dependencies) from other such bundles; this allows for dependencies and
+ * processors that require conflicting, incompatible versions of the same
+ * dependency to run in a single instance of NiFi.</p>
+ *
+ * <p>
+ * <tt>NarClassLoader</tt> follows the delegation model described in
+ * {@link ClassLoader#findClass(String) ClassLoader.findClass(...)};
+ * classes are first loaded from the parent <tt>ClassLoader</tt>, and only if
+ * they cannot be found there does the <tt>NarClassLoader</tt> provide a
+ * definition. Specifically, this means that resources are loaded from NiFi's
+ * <tt>conf</tt>
+ * and <tt>lib</tt> directories first, and if they cannot be found there, are
+ * loaded from the NAR.</p>
+ *
+ * <p>
+ * The packaging of a NAR is such that it is a ZIP file with the following
+ * directory structure:
+ *
+ * <pre>
+ * +META-INF/
+ * +-- bundled-dependencies/
+ * +-- <JAR files>
+ * +-- MANIFEST.MF
+ * </pre>
+ * </p>
+ *
+ * <p>
+ * The MANIFEST.MF file contains the same information as a typical JAR file but
+ * also includes two additional NiFi properties: {@code Nar-Id} and
+ * {@code Nar-Dependency-Id}.
+ * </p>
+ *
+ * <p>
+ * The {@code Nar-Id} provides a unique identifier for this NAR.
+ * </p>
+ *
+ * <p>
+ * The {@code Nar-Dependency-Id} is optional. If provided, it indicates that
+ * this NAR should inherit all of the dependencies of the NAR with the provided
+ * ID. Often times, the NAR that is depended upon is referred to as the Parent.
+ * This is because its ClassLoader will be the parent ClassLoader of the
+ * dependent NAR.
+ * </p>
+ *
+ * <p>
+ * If a NAR is built using NiFi's Maven NAR Plugin, the {@code Nar-Id} property
+ * will be set to the artifactId of the NAR. The {@code Nar-Dependency-Id} will
+ * be set to the artifactId of the NAR that is depended upon. For example, if
+ * NAR A is defined as such:
+ *
+ * <pre>
+ * ...
+ * <artifactId>nar-a</artifactId>
+ * <packaging>nar</packaging>
+ * ...
+ * <dependencies>
+ * <dependency>
+ * <groupId>group</groupId>
+ * <artifactId>nar-z</artifactId>
+ * <b><type>nar</type></b>
+ * </dependency>
+ * </dependencies>
+ * </pre>
+ * </p>
+ *
+ *
+ * <p>
+ * Then the MANIFEST.MF file that is created for NAR A will have the following
+ * properties set:
+ * <ul>
+ * <li>{@code Nar-Id: nar-a}</li>
+ * <li>{@code Nar-Dependency-Id: nar-z}</li>
+ * </ul>
+ * </p>
+ *
+ * <p>
+ * Note, above, that the {@code type} of the dependency is set to {@code nar}.
+ * </p>
+ *
+ * <p>
+ * If the NAR has more than one dependency of {@code type} {@code nar}, then the
+ * Maven NAR plugin will fail to build the NAR.
+ * </p>
+ */
+public class NarClassLoader extends URLClassLoader {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(NarClassLoader.class);
+
+ private static final FileFilter JAR_FILTER = new FileFilter() {
+ @Override
+ public boolean accept(File pathname) {
+ final String nameToTest = pathname.getName().toLowerCase();
+ return nameToTest.endsWith(".jar") && pathname.isFile();
+ }
+ };
+
+ /**
+ * The NAR for which this <tt>ClassLoader</tt> is responsible.
+ */
+ private final File narWorkingDirectory;
+
+ /**
+ * Construct a nar class loader.
+ *
+ * @param narWorkingDirectory directory to explode nar contents to
+ * @throws IllegalArgumentException if the NAR is missing the Java Services
+ * API file for <tt>FlowFileProcessor</tt> implementations.
+ * @throws ClassNotFoundException if any of the <tt>FlowFileProcessor</tt>
+ * implementations defined by the Java Services API cannot be loaded.
+ * @throws IOException if an error occurs while loading the NAR.
+ */
+ public NarClassLoader(final File narWorkingDirectory) throws ClassNotFoundException, IOException {
+ super(new URL[0]);
+ this.narWorkingDirectory = narWorkingDirectory;
+
+ // process the classpath
+ updateClasspath(narWorkingDirectory);
+ }
+
+ /**
+ * Construct a nar class loader with the specific parent.
+ *
+ * @param narWorkingDirectory directory to explode nar contents to
+ * @param parentClassLoader parent class loader of this nar
+ * @throws IllegalArgumentException if the NAR is missing the Java Services
+ * API file for <tt>FlowFileProcessor</tt> implementations.
+ * @throws ClassNotFoundException if any of the <tt>FlowFileProcessor</tt>
+ * implementations defined by the Java Services API cannot be loaded.
+ * @throws IOException if an error occurs while loading the NAR.
+ */
+ public NarClassLoader(final File narWorkingDirectory, final ClassLoader parentClassLoader) throws ClassNotFoundException, IOException {
+ super(new URL[0], parentClassLoader);
+ this.narWorkingDirectory = narWorkingDirectory;
+
+ // process the classpath
+ updateClasspath(narWorkingDirectory);
+ }
+
+ public File getWorkingDirectory() {
+ return narWorkingDirectory;
+ }
+
+ /**
+ * Adds URLs for the resources unpacked from this NAR:
+ * <ul><li>the root: for classes, <tt>META-INF</tt>, etc.</li>
+ * <li><tt>META-INF/dependencies</tt>: for config files, <tt>.so</tt>s,
+ * etc.</li>
+ * <li><tt>META-INF/dependencies/*.jar</tt>: for dependent
+ * libraries</li></ul>
+ *
+ * @param root the root directory of the unpacked NAR.
+ * @throws IOException if the URL list could not be updated.
+ */
+ private void updateClasspath(File root) throws IOException {
+ addURL(root.toURI().toURL()); // for compiled classes, META-INF/, etc.
+
+ File dependencies = new File(root, "META-INF/bundled-dependencies");
+ if (!dependencies.isDirectory()) {
+ LOGGER.warn(narWorkingDirectory + " does not contain META-INF/bundled-dependencies!");
+ }
+ addURL(dependencies.toURI().toURL());
+ if (dependencies.isDirectory()) {
+ for (File libJar : dependencies.listFiles(JAR_FILTER)) {
+ addURL(libJar.toURI().toURL());
+ }
+ }
+ }
+
+ @Override
+ protected String findLibrary(final String libname) {
+ File dependencies = new File(narWorkingDirectory, "META-INF/bundled-dependencies");
+ if (!dependencies.isDirectory()) {
+ LOGGER.warn(narWorkingDirectory + " does not contain META-INF/bundled-dependencies!");
+ }
+
+ final File nativeDir = new File(dependencies, "native");
+ final File libsoFile = new File(nativeDir, "lib" + libname + ".so");
+ final File dllFile = new File(nativeDir, libname + ".dll");
+ final File soFile = new File(nativeDir, libname + ".so");
+ if (libsoFile.exists()) {
+ return libsoFile.getAbsolutePath();
+ } else if (dllFile.exists()) {
+ return dllFile.getAbsolutePath();
+ } else if (soFile.exists()) {
+ return soFile.getAbsolutePath();
+ }
+
+ // not found in the nar. try system native dir
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return NarClassLoader.class.getName() + "[" + narWorkingDirectory.getPath() + "]";
+ }
+}