You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/04/04 18:28:44 UTC
[08/18] nifi git commit: NIFI-1563: - Federate requests and merge
responses from nodes instead of storing bulletins and stats at NCM - Updating
UI to support restructured status history DTO. - Return 'Insufficient
History' message if aggregate stats don'
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 945c671..b981bde 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -27,11 +27,8 @@ import org.apache.nifi.admin.service.AccountNotFoundException;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.admin.service.UserService;
import org.apache.nifi.authorization.Authority;
-import org.apache.nifi.cluster.HeartbeatPayload;
import org.apache.nifi.cluster.context.ClusterContext;
import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
-import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
-import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.manager.impl.WebClusterManager;
import org.apache.nifi.cluster.node.Node;
@@ -55,11 +52,6 @@ import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceReference;
import org.apache.nifi.controller.service.ControllerServiceState;
-import org.apache.nifi.controller.status.ConnectionStatus;
-import org.apache.nifi.controller.status.PortStatus;
-import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.controller.status.ProcessorStatus;
-import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.diagnostics.SystemDiagnostics;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.ProcessGroupCounts;
@@ -72,7 +64,6 @@ import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinQuery;
import org.apache.nifi.reporting.BulletinRepository;
-import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.user.AccountStatus;
import org.apache.nifi.user.NiFiUser;
import org.apache.nifi.user.NiFiUserGroup;
@@ -91,6 +82,7 @@ import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
import org.apache.nifi.web.api.dto.CounterDTO;
import org.apache.nifi.web.api.dto.CountersDTO;
+import org.apache.nifi.web.api.dto.CountersSnapshotDTO;
import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
import org.apache.nifi.web.api.dto.DropRequestDTO;
import org.apache.nifi.web.api.dto.DtoFactory;
@@ -100,7 +92,6 @@ import org.apache.nifi.web.api.dto.FunnelDTO;
import org.apache.nifi.web.api.dto.LabelDTO;
import org.apache.nifi.web.api.dto.ListingRequestDTO;
import org.apache.nifi.web.api.dto.NodeDTO;
-import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsDTO;
import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.api.dto.PreviousValueDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
@@ -125,21 +116,12 @@ import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO;
import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO;
import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
-import org.apache.nifi.web.api.dto.status.ClusterConnectionStatusDTO;
-import org.apache.nifi.web.api.dto.status.ClusterPortStatusDTO;
-import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO;
-import org.apache.nifi.web.api.dto.status.ClusterProcessorStatusDTO;
-import org.apache.nifi.web.api.dto.status.ClusterRemoteProcessGroupStatusDTO;
-import org.apache.nifi.web.api.dto.status.ClusterStatusDTO;
-import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO;
+import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
-import org.apache.nifi.web.api.dto.status.NodeConnectionStatusDTO;
-import org.apache.nifi.web.api.dto.status.NodePortStatusDTO;
-import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusDTO;
-import org.apache.nifi.web.api.dto.status.NodeProcessorStatusDTO;
-import org.apache.nifi.web.api.dto.status.NodeRemoteProcessGroupStatusDTO;
-import org.apache.nifi.web.api.dto.status.NodeStatusDTO;
+import org.apache.nifi.web.api.dto.status.PortStatusDTO;
import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
+import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
+import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.nifi.web.controller.ControllerFacade;
import org.apache.nifi.web.dao.ConnectionDAO;
@@ -393,7 +375,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
controllerFacade.save();
- return new ConfigurationResult() {
+ return new ConfigurationResult<ConnectionDTO>() {
@Override
public boolean isNew() {
return false;
@@ -424,7 +406,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// save the flow
controllerFacade.save();
- return new ConfigurationResult() {
+ return new ConfigurationResult<ProcessorDTO>() {
@Override
public boolean isNew() {
return false;
@@ -455,7 +437,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// save updated controller
controllerFacade.save();
- return new ConfigurationResult() {
+ return new ConfigurationResult<LabelDTO>() {
@Override
public boolean isNew() {
return false;
@@ -486,7 +468,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// save updated controller
controllerFacade.save();
- return new ConfigurationResult() {
+ return new ConfigurationResult<FunnelDTO>() {
@Override
public boolean isNew() {
return false;
@@ -532,7 +514,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
controllerFacade.save();
}
- return new ConfigurationResult() {
+ return new ConfigurationResult<SnippetDTO>() {
@Override
public boolean isNew() {
return false;
@@ -562,7 +544,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// save updated controller
controllerFacade.save();
- return new ConfigurationResult() {
+ return new ConfigurationResult<PortDTO>() {
@Override
public boolean isNew() {
return false;
@@ -592,7 +574,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// save updated controller
controllerFacade.save();
- return new ConfigurationResult() {
+ return new ConfigurationResult<PortDTO>() {
@Override
public boolean isNew() {
return false;
@@ -622,7 +604,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// save updated controller
controllerFacade.save();
- return new ConfigurationResult() {
+ return new ConfigurationResult<RemoteProcessGroupDTO>() {
@Override
public boolean isNew() {
return false;
@@ -649,7 +631,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// save updated controller
controllerFacade.save();
- return new ConfigurationResult() {
+ return new ConfigurationResult<RemoteProcessGroupPortDTO>() {
@Override
public boolean isNew() {
return false;
@@ -676,7 +658,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// save updated controller
controllerFacade.save();
- return new ConfigurationResult() {
+ return new ConfigurationResult<RemoteProcessGroupPortDTO>() {
@Override
public boolean isNew() {
return false;
@@ -711,7 +693,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// save updated controller
controllerFacade.save();
- return new ConfigurationResult() {
+ return new ConfigurationResult<ProcessGroupDTO>() {
@Override
public boolean isNew() {
return false;
@@ -751,7 +733,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// save the flow
controllerFacade.save();
- return new ConfigurationResult() {
+ return new ConfigurationResult<ControllerConfigurationDTO>() {
@Override
public boolean isNew() {
return false;
@@ -808,14 +790,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// clear the state for the specified component
processorDAO.clearState(groupId, processorId);
- return new ConfigurationResult() {
+ return new ConfigurationResult<Void>() {
@Override
public boolean isNew() {
return false;
}
@Override
- public ControllerConfigurationDTO getConfiguration() {
+ public Void getConfiguration() {
return null;
}
};
@@ -836,14 +818,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// clear the state for the specified component
controllerServiceDAO.clearState(controllerServiceId);
- return new ConfigurationResult() {
+ return new ConfigurationResult<Void>() {
@Override
public boolean isNew() {
return false;
}
@Override
- public ControllerConfigurationDTO getConfiguration() {
+ public Void getConfiguration() {
return null;
}
};
@@ -864,14 +846,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// clear the state for the specified component
reportingTaskDAO.clearState(reportingTaskId);
- return new ConfigurationResult() {
+ return new ConfigurationResult<Void>() {
@Override
public boolean isNew() {
return false;
}
@Override
- public ControllerConfigurationDTO getConfiguration() {
+ public Void getConfiguration() {
return null;
}
};
@@ -889,14 +871,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// save the flow
controllerFacade.save();
- return new ConfigurationResult() {
+ return new ConfigurationResult<Void>() {
@Override
public boolean isNew() {
return false;
}
@Override
- public ControllerConfigurationDTO getConfiguration() {
+ public Void getConfiguration() {
return null;
}
};
@@ -936,14 +918,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// save the flow
controllerFacade.save();
- return new ConfigurationResult() {
+ return new ConfigurationResult<Void>() {
@Override
public boolean isNew() {
return false;
}
@Override
- public ControllerConfigurationDTO getConfiguration() {
+ public Void getConfiguration() {
return null;
}
};
@@ -962,14 +944,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// save the flow
controllerFacade.save();
- return new ConfigurationResult() {
+ return new ConfigurationResult<Void>() {
@Override
public boolean isNew() {
return false;
}
@Override
- public ControllerConfigurationDTO getConfiguration() {
+ public Void getConfiguration() {
return null;
}
};
@@ -988,14 +970,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// save the flow
controllerFacade.save();
- return new ConfigurationResult() {
+ return new ConfigurationResult<Void>() {
@Override
public boolean isNew() {
return false;
}
@Override
- public ControllerConfigurationDTO getConfiguration() {
+ public Void getConfiguration() {
return null;
}
};
@@ -1025,14 +1007,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
controllerFacade.save();
}
- return new ConfigurationResult() {
+ return new ConfigurationResult<Void>() {
@Override
public boolean isNew() {
return false;
}
@Override
- public ControllerConfigurationDTO getConfiguration() {
+ public Void getConfiguration() {
return null;
}
};
@@ -1050,14 +1032,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// save the flow
controllerFacade.save();
- return new ConfigurationResult() {
+ return new ConfigurationResult<Void>() {
@Override
public boolean isNew() {
return false;
}
@Override
- public ControllerConfigurationDTO getConfiguration() {
+ public Void getConfiguration() {
return null;
}
};
@@ -1075,14 +1057,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// save the flow
controllerFacade.save();
- return new ConfigurationResult() {
+ return new ConfigurationResult<Void>() {
@Override
public boolean isNew() {
return false;
}
@Override
- public ControllerConfigurationDTO getConfiguration() {
+ public Void getConfiguration() {
return null;
}
};
@@ -1100,14 +1082,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// save the flow
controllerFacade.save();
- return new ConfigurationResult() {
+ return new ConfigurationResult<Void>() {
@Override
public boolean isNew() {
return false;
}
@Override
- public ControllerConfigurationDTO getConfiguration() {
+ public Void getConfiguration() {
return null;
}
};
@@ -1125,14 +1107,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// save the flow
controllerFacade.save();
- return new ConfigurationResult() {
+ return new ConfigurationResult<Void>() {
@Override
public boolean isNew() {
return false;
}
@Override
- public ControllerConfigurationDTO getConfiguration() {
+ public Void getConfiguration() {
return null;
}
};
@@ -1161,7 +1143,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// save the flow
controllerFacade.save();
- return new ConfigurationResult() {
+ return new ConfigurationResult<ConnectionDTO>() {
@Override
public boolean isNew() {
return true;
@@ -1213,7 +1195,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// save the flow
controllerFacade.save();
- return new ConfigurationResult() {
+ return new ConfigurationResult<ProcessorDTO>() {
@Override
public boolean isNew() {
return true;
@@ -1244,7 +1226,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// save the flow
controllerFacade.save();
- return new ConfigurationResult() {
+ return new ConfigurationResult<LabelDTO>() {
@Override
public boolean isNew() {
return true;
@@ -1275,7 +1257,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// save the flow
controllerFacade.save();
- return new ConfigurationResult() {
+ return new ConfigurationResult<FunnelDTO>() {
@Override
public boolean isNew() {
return true;
@@ -1366,7 +1348,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// save the flow
controllerFacade.save();
- return new ConfigurationResult() {
+ return new ConfigurationResult<FlowSnippetDTO>() {
@Override
public boolean isNew() {
return false;
@@ -1396,7 +1378,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final SnippetDTO responseSnippetDTO = dtoFactory.createSnippetDto(snippet);
responseSnippetDTO.setContents(snippetUtils.populateFlowSnippet(snippet, false, false));
- return new ConfigurationResult() {
+ return new ConfigurationResult<SnippetDTO>() {
@Override
public boolean isNew() {
return true;
@@ -1426,7 +1408,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// save the flow
controllerFacade.save();
- return new ConfigurationResult() {
+ return new ConfigurationResult<PortDTO>() {
@Override
public boolean isNew() {
return true;
@@ -1456,7 +1438,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// save the flow
controllerFacade.save();
- return new ConfigurationResult() {
+ return new ConfigurationResult<PortDTO>() {
@Override
public boolean isNew() {
return true;
@@ -1486,7 +1468,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// save the flow
controllerFacade.save();
- return new ConfigurationResult() {
+ return new ConfigurationResult<ProcessGroupDTO>() {
@Override
public boolean isNew() {
return true;
@@ -1516,7 +1498,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// save the flow
controllerFacade.save();
- return new ConfigurationResult() {
+ return new ConfigurationResult<RemoteProcessGroupDTO>() {
@Override
public boolean isNew() {
return true;
@@ -1588,7 +1570,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// save the flow
controllerFacade.save();
- return new ConfigurationResult() {
+ return new ConfigurationResult<FlowSnippetDTO>() {
@Override
public boolean isNew() {
return false;
@@ -1611,14 +1593,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// create the archive
controllerFacade.createArchive();
- return new ConfigurationResult() {
+ return new ConfigurationResult<Void>() {
@Override
public boolean isNew() {
return false;
}
@Override
- public ControllerConfigurationDTO getConfiguration() {
+ public Void getConfiguration() {
return null;
}
};
@@ -1654,7 +1636,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// save the flow
controllerFacade.save();
- return new ConfigurationResult() {
+ return new ConfigurationResult<ProcessorDTO>() {
@Override
public boolean isNew() {
return false;
@@ -1689,7 +1671,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
controllerFacade.save();
}
- return new ConfigurationResult() {
+ return new ConfigurationResult<ControllerServiceDTO>() {
@Override
public boolean isNew() {
return true;
@@ -1723,7 +1705,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
controllerFacade.save();
}
- return new ConfigurationResult() {
+ return new ConfigurationResult<ControllerServiceDTO>() {
@Override
public boolean isNew() {
return false;
@@ -1750,7 +1732,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
public ConfigurationResult<Set<ControllerServiceReferencingComponentDTO>> execute() {
final ControllerServiceReference reference = controllerServiceDAO.updateControllerServiceReferencingComponents(controllerServiceId, scheduledState, controllerServiceState);
- return new ConfigurationResult() {
+ return new ConfigurationResult<Set<ControllerServiceReferencingComponentDTO>>() {
@Override
public boolean isNew() {
return false;
@@ -1780,14 +1762,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
controllerFacade.save();
}
- return new ConfigurationResult() {
+ return new ConfigurationResult<Void>() {
@Override
public boolean isNew() {
return false;
}
@Override
- public ControllerConfigurationDTO getConfiguration() {
+ public Void getConfiguration() {
return null;
}
};
@@ -1815,7 +1797,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
controllerFacade.save();
}
- return new ConfigurationResult() {
+ return new ConfigurationResult<ReportingTaskDTO>() {
@Override
public boolean isNew() {
return true;
@@ -1849,7 +1831,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
controllerFacade.save();
}
- return new ConfigurationResult() {
+ return new ConfigurationResult<ReportingTaskDTO>() {
@Override
public boolean isNew() {
return false;
@@ -1879,14 +1861,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
controllerFacade.save();
}
- return new ConfigurationResult() {
+ return new ConfigurationResult<Void>() {
@Override
public boolean isNew() {
return false;
}
@Override
- public ControllerConfigurationDTO getConfiguration() {
+ public Void getConfiguration() {
return null;
}
};
@@ -2105,78 +2087,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public ProcessGroupStatusDTO getProcessGroupStatus(String groupId) {
- ProcessGroupStatusDTO statusReport;
- if (properties.isClusterManager()) {
- final ProcessGroupStatus mergedProcessGroupStatus = clusterManager.getProcessGroupStatus(groupId);
- if (mergedProcessGroupStatus == null) {
- throw new ResourceNotFoundException(String.format("Unable to find status for process group %s.", groupId));
- }
- statusReport = dtoFactory.createProcessGroupStatusDto(clusterManager.getBulletinRepository(), mergedProcessGroupStatus);
- } else {
- statusReport = controllerFacade.getProcessGroupStatus(groupId);
- }
- return statusReport;
+ return controllerFacade.getProcessGroupStatus(groupId);
}
@Override
public ControllerStatusDTO getControllerStatus() {
- final ControllerStatusDTO controllerStatus;
-
- if (properties.isClusterManager()) {
- final Set<Node> connectedNodes = clusterManager.getNodes(Node.Status.CONNECTED);
-
- if (connectedNodes.isEmpty()) {
- throw new NoConnectedNodesException();
- }
-
- int activeThreadCount = 0;
- long totalFlowFileObjectCount = 0;
- long totalFlowFileByteCount = 0;
- for (final Node node : connectedNodes) {
- final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
- if (nodeHeartbeatPayload == null) {
- continue;
- }
-
- activeThreadCount += nodeHeartbeatPayload.getActiveThreadCount();
- totalFlowFileObjectCount += nodeHeartbeatPayload.getTotalFlowFileCount();
- totalFlowFileByteCount += nodeHeartbeatPayload.getTotalFlowFileBytes();
- }
-
- controllerStatus = new ControllerStatusDTO();
- controllerStatus.setActiveThreadCount(activeThreadCount);
- controllerStatus.setQueued(FormatUtils.formatCount(totalFlowFileObjectCount) + " / " + FormatUtils.formatDataSize(totalFlowFileByteCount));
-
- final int numNodes = clusterManager.getNodeIds().size();
- controllerStatus.setConnectedNodes(connectedNodes.size() + " / " + numNodes);
-
- // get the bulletins for the controller
- final BulletinRepository bulletinRepository = clusterManager.getBulletinRepository();
- controllerStatus.setBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForController()));
-
- // get the controller service bulletins
- final BulletinQuery controllerServiceQuery = new BulletinQuery.Builder().sourceType(ComponentType.CONTROLLER_SERVICE).build();
- controllerStatus.setControllerServiceBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletins(controllerServiceQuery)));
-
- // get the reporting task bulletins
- final BulletinQuery reportingTaskQuery = new BulletinQuery.Builder().sourceType(ComponentType.REPORTING_TASK).build();
- controllerStatus.setReportingTaskBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletins(reportingTaskQuery)));
-
- // get the component counts by extracting them from the roots' group status
- final ProcessGroupStatus status = clusterManager.getProcessGroupStatus("root");
- if (status != null) {
- final ProcessGroupCounts counts = extractProcessGroupCounts(status);
- controllerStatus.setRunningCount(counts.getRunningCount());
- controllerStatus.setStoppedCount(counts.getStoppedCount());
- controllerStatus.setInvalidCount(counts.getInvalidCount());
- controllerStatus.setDisabledCount(counts.getDisabledCount());
- controllerStatus.setActiveRemotePortCount(counts.getActiveRemotePortCount());
- controllerStatus.setInactiveRemotePortCount(counts.getInactiveRemotePortCount());
- }
- } else {
- // get the controller status
- controllerStatus = controllerFacade.getControllerStatus();
- }
+ // get the controller status
+ final ControllerStatusDTO controllerStatus = controllerFacade.getControllerStatus();
// determine if there are any pending user accounts - only include if appropriate
if (NiFiUserUtils.getAuthorities().contains(Authority.ROLE_ADMIN.toString())) {
@@ -2218,53 +2135,17 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public CountersDTO getCounters() {
- if (properties.isClusterManager()) {
- final Map<String, CounterDTO> mergedCountersMap = new HashMap<>();
- final Set<Node> connectedNodes = clusterManager.getNodes(Node.Status.CONNECTED);
-
- if (connectedNodes.isEmpty()) {
- throw new NoConnectedNodesException();
- }
-
- for (final Node node : connectedNodes) {
- final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
- if (nodeHeartbeatPayload == null) {
- continue;
- }
- final List<Counter> nodeCounters = node.getHeartbeatPayload().getCounters();
- if (nodeCounters == null) {
- continue;
- }
-
- // for each node, add its counter values to the aggregate values
- for (final Counter nodeCounter : nodeCounters) {
- final CounterDTO mergedCounter = mergedCountersMap.get(nodeCounter.getIdentifier());
-
- // either create a new aggregate counter or update the aggregate counter
- if (mergedCounter == null) {
- // add new counter
- mergedCountersMap.put(nodeCounter.getIdentifier(), dtoFactory.createCounterDto(nodeCounter));
- } else {
- // update aggregate counter
- mergedCounter.setValueCount(mergedCounter.getValueCount() + nodeCounter.getValue());
- mergedCounter.setValue(FormatUtils.formatCount(mergedCounter.getValueCount()));
- }
- }
- }
-
- final CountersDTO mergedCounters = new CountersDTO();
- mergedCounters.setGenerated(new Date());
- mergedCounters.setCounters(mergedCountersMap.values());
- return mergedCounters;
- } else {
- List<Counter> counters = controllerFacade.getCounters();
- Set<CounterDTO> counterDTOs = new LinkedHashSet<>(counters.size());
- for (Counter counter : counters) {
- counterDTOs.add(dtoFactory.createCounterDto(counter));
- }
- return dtoFactory.createCountersDto(counterDTOs);
+ List<Counter> counters = controllerFacade.getCounters();
+ Set<CounterDTO> counterDTOs = new LinkedHashSet<>(counters.size());
+ for (Counter counter : counters) {
+ counterDTOs.add(dtoFactory.createCounterDto(counter));
}
+ final CountersSnapshotDTO snapshotDto = dtoFactory.createCountersDto(counterDTOs);
+ final CountersDTO countersDto = new CountersDTO();
+ countersDto.setAggregateSnapshot(snapshotDto);
+
+ return countersDto;
}
@Override
@@ -2308,6 +2189,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
+ public ConnectionStatusDTO getConnectionStatus(String groupId, String connectionId) {
+ return controllerFacade.getConnectionStatus(groupId, connectionId);
+ }
+
+ @Override
public StatusHistoryDTO getConnectionStatusHistory(String groupId, String connectionId) {
return controllerFacade.getConnectionStatusHistory(groupId, connectionId);
}
@@ -2386,6 +2272,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
+ public ProcessorStatusDTO getProcessorStatus(String groupId, String id) {
+ return controllerFacade.getProcessorStatus(groupId, id);
+ }
+
+ @Override
public StatusHistoryDTO getProcessorStatusHistory(String groupId, String id) {
return controllerFacade.getProcessorStatusHistory(groupId, id);
}
@@ -2429,18 +2320,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public SystemDiagnosticsDTO getSystemDiagnostics() {
- final SystemDiagnosticsDTO dto;
- if (properties.isClusterManager()) {
- final SystemDiagnostics clusterDiagnostics = clusterManager.getSystemDiagnostics();
- if (clusterDiagnostics == null) {
- throw new IllegalStateException("Nodes are connected but no systems diagnostics have been reported.");
- }
- dto = dtoFactory.createSystemDiagnosticsDto(clusterDiagnostics);
- } else {
- final SystemDiagnostics sysDiagnostics = controllerFacade.getSystemDiagnostics();
- dto = dtoFactory.createSystemDiagnosticsDto(sysDiagnostics);
- }
- return dto;
+ final SystemDiagnostics sysDiagnostics = controllerFacade.getSystemDiagnostics();
+ return dtoFactory.createSystemDiagnosticsDto(sysDiagnostics);
}
/**
@@ -2651,16 +2532,31 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
+ public PortStatusDTO getInputPortStatus(String groupId, String inputPortId) {
+ return controllerFacade.getInputPortStatus(groupId, inputPortId);
+ }
+
+ @Override
public PortDTO getOutputPort(String groupId, String outputPortId) {
return dtoFactory.createPortDto(outputPortDAO.getPort(groupId, outputPortId));
}
@Override
+ public PortStatusDTO getOutputPortStatus(String groupId, String outputPortId) {
+ return controllerFacade.getOutputPortStatus(groupId, outputPortId);
+ }
+
+ @Override
public RemoteProcessGroupDTO getRemoteProcessGroup(String groupId, String remoteProcessGroupId) {
return dtoFactory.createRemoteProcessGroupDto(remoteProcessGroupDAO.getRemoteProcessGroup(groupId, remoteProcessGroupId));
}
@Override
+ public RemoteProcessGroupStatusDTO getRemoteProcessGroupStatus(String groupId, String id) {
+ return controllerFacade.getRemoteProcessGroupStatus(groupId, id);
+ }
+
+ @Override
public StatusHistoryDTO getRemoteProcessGroupStatusHistory(String groupId, String id) {
return controllerFacade.getRemoteProcessGroupStatusHistory(groupId, id);
}
@@ -2934,575 +2830,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
clusterManager.deleteNode(nodeId, userDn);
}
- private ProcessorStatus findNodeProcessorStatus(final ProcessGroupStatus groupStatus, final String processorId) {
- ProcessorStatus processorStatus = null;
-
- for (final ProcessorStatus status : groupStatus.getProcessorStatus()) {
- if (processorId.equals(status.getId())) {
- processorStatus = status;
- break;
- }
- }
-
- if (processorStatus == null) {
- for (final ProcessGroupStatus status : groupStatus.getProcessGroupStatus()) {
- processorStatus = findNodeProcessorStatus(status, processorId);
-
- if (processorStatus != null) {
- break;
- }
- }
- }
-
- return processorStatus;
- }
-
- // TODO Refactor!!!
- @Override
- public ClusterProcessorStatusDTO getClusterProcessorStatus(String processorId) {
-
- final ClusterProcessorStatusDTO clusterProcessorStatusDto = new ClusterProcessorStatusDTO();
- clusterProcessorStatusDto.setNodeProcessorStatus(new ArrayList<NodeProcessorStatusDTO>());
-
- // set the current time
- clusterProcessorStatusDto.setStatsLastRefreshed(new Date());
-
- final Set<Node> nodes = clusterManager.getNodes(Node.Status.CONNECTED);
- boolean firstNode = true;
- for (final Node node : nodes) {
-
- final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
- if (nodeHeartbeatPayload == null) {
- continue;
- }
-
- final ProcessGroupStatus nodeStats = nodeHeartbeatPayload.getProcessGroupStatus();
- if (nodeStats == null || nodeStats.getProcessorStatus() == null) {
- continue;
- }
-
- // attempt to find the processor stats for this node
- final ProcessorStatus processorStatus = findNodeProcessorStatus(nodeStats, processorId);
-
- // sanity check that we have status for this processor
- if (processorStatus == null) {
- throw new ResourceNotFoundException(String.format("Unable to find status for processor id '%s'.", processorId));
- }
-
- if (firstNode) {
- clusterProcessorStatusDto.setProcessorId(processorId);
- clusterProcessorStatusDto.setProcessorName(processorStatus.getName());
- clusterProcessorStatusDto.setProcessorType(processorStatus.getType());
- clusterProcessorStatusDto.setProcessorRunStatus(processorStatus.getRunStatus().toString());
- firstNode = false;
- }
-
- // create node processor status dto
- final NodeProcessorStatusDTO nodeProcessorStatusDTO = new NodeProcessorStatusDTO();
- clusterProcessorStatusDto.getNodeProcessorStatus().add(nodeProcessorStatusDTO);
-
- // populate node processor status dto
- final String nodeId = node.getNodeId().getId();
- nodeProcessorStatusDTO.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId)));
- nodeProcessorStatusDTO.setProcessorStatus(dtoFactory.createProcessorStatusDto(processorStatus));
-
- }
-
- return clusterProcessorStatusDto;
- }
-
- private ConnectionStatus findNodeConnectionStatus(final ProcessGroupStatus groupStatus, final String connectionId) {
- ConnectionStatus connectionStatus = null;
-
- for (final ConnectionStatus status : groupStatus.getConnectionStatus()) {
- if (connectionId.equals(status.getId())) {
- connectionStatus = status;
- break;
- }
- }
-
- if (connectionStatus == null) {
- for (final ProcessGroupStatus status : groupStatus.getProcessGroupStatus()) {
- connectionStatus = findNodeConnectionStatus(status, connectionId);
-
- if (connectionStatus != null) {
- break;
- }
- }
- }
-
- return connectionStatus;
- }
-
- @Override
- public ClusterConnectionStatusDTO getClusterConnectionStatus(String connectionId) {
- final ClusterConnectionStatusDTO clusterConnectionStatusDto = new ClusterConnectionStatusDTO();
- clusterConnectionStatusDto.setNodeConnectionStatus(new ArrayList<NodeConnectionStatusDTO>());
-
- // set the current time
- clusterConnectionStatusDto.setStatsLastRefreshed(new Date());
-
- final Set<Node> nodes = clusterManager.getNodes(Node.Status.CONNECTED);
- boolean firstNode = true;
- for (final Node node : nodes) {
-
- final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
- if (nodeHeartbeatPayload == null) {
- continue;
- }
-
- final ProcessGroupStatus nodeStats = nodeHeartbeatPayload.getProcessGroupStatus();
- if (nodeStats == null || nodeStats.getProcessorStatus() == null) {
- continue;
- }
-
- // find the connection status for this node
- final ConnectionStatus connectionStatus = findNodeConnectionStatus(nodeStats, connectionId);
-
- // sanity check that we have status for this connection
- if (connectionStatus == null) {
- throw new ResourceNotFoundException(String.format("Unable to find status for connection id '%s'.", connectionId));
- }
-
- if (firstNode) {
- clusterConnectionStatusDto.setConnectionId(connectionId);
- clusterConnectionStatusDto.setConnectionName(connectionStatus.getName());
- firstNode = false;
- }
-
- // create node connection status dto
- final NodeConnectionStatusDTO nodeConnectionStatusDTO = new NodeConnectionStatusDTO();
- clusterConnectionStatusDto.getNodeConnectionStatus().add(nodeConnectionStatusDTO);
-
- // populate node processor status dto
- final String nodeId = node.getNodeId().getId();
- nodeConnectionStatusDTO.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId)));
- nodeConnectionStatusDTO.setConnectionStatus(dtoFactory.createConnectionStatusDto(connectionStatus));
-
- }
-
- return clusterConnectionStatusDto;
- }
-
- private ProcessGroupStatus findNodeProcessGroupStatus(final ProcessGroupStatus groupStatus, final String processGroupId) {
- ProcessGroupStatus processGroupStatus = null;
-
- if (processGroupId.equals(groupStatus.getId())) {
- processGroupStatus = groupStatus;
- }
-
- if (processGroupStatus == null) {
- for (final ProcessGroupStatus status : groupStatus.getProcessGroupStatus()) {
- processGroupStatus = findNodeProcessGroupStatus(status, processGroupId);
-
- if (processGroupStatus != null) {
- break;
- }
- }
- }
-
- return processGroupStatus;
- }
-
- @Override
- public ClusterProcessGroupStatusDTO getClusterProcessGroupStatus(String processGroupId) {
-
- final ClusterProcessGroupStatusDTO clusterProcessGroupStatusDto = new ClusterProcessGroupStatusDTO();
- clusterProcessGroupStatusDto.setNodeProcessGroupStatus(new ArrayList<NodeProcessGroupStatusDTO>());
-
- // set the current time
- clusterProcessGroupStatusDto.setStatsLastRefreshed(new Date());
-
- final Set<Node> nodes = clusterManager.getNodes(Node.Status.CONNECTED);
- boolean firstNode = true;
- for (final Node node : nodes) {
-
- final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
- if (nodeHeartbeatPayload == null) {
- continue;
- }
-
- final ProcessGroupStatus nodeStats = nodeHeartbeatPayload.getProcessGroupStatus();
- if (nodeStats == null || nodeStats.getProcessorStatus() == null) {
- continue;
- }
-
- // attempt to find the process group stats for this node
- final ProcessGroupStatus processGroupStatus = findNodeProcessGroupStatus(nodeStats, processGroupId);
-
- // sanity check that we have status for this process group
- if (processGroupStatus == null) {
- throw new ResourceNotFoundException(String.format("Unable to find status for process group id '%s'.", processGroupId));
- }
-
- if (firstNode) {
- clusterProcessGroupStatusDto.setProcessGroupId(processGroupId);
- clusterProcessGroupStatusDto.setProcessGroupName(processGroupStatus.getName());
- firstNode = false;
- }
-
- // create node process group status dto
- final NodeProcessGroupStatusDTO nodeProcessGroupStatusDTO = new NodeProcessGroupStatusDTO();
- clusterProcessGroupStatusDto.getNodeProcessGroupStatus().add(nodeProcessGroupStatusDTO);
-
- // populate node process group status dto
- final String nodeId = node.getNodeId().getId();
- nodeProcessGroupStatusDTO.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId)));
- nodeProcessGroupStatusDTO.setProcessGroupStatus(dtoFactory.createProcessGroupStatusDto(clusterManager.getBulletinRepository(), processGroupStatus));
-
- }
-
- return clusterProcessGroupStatusDto;
- }
-
- private PortStatus findNodeInputPortStatus(final ProcessGroupStatus groupStatus, final String inputPortId) {
- PortStatus portStatus = null;
-
- for (final PortStatus status : groupStatus.getInputPortStatus()) {
- if (inputPortId.equals(status.getId())) {
- portStatus = status;
- break;
- }
- }
-
- if (portStatus == null) {
- for (final ProcessGroupStatus status : groupStatus.getProcessGroupStatus()) {
- portStatus = findNodeInputPortStatus(status, inputPortId);
-
- if (portStatus != null) {
- break;
- }
- }
- }
-
- return portStatus;
- }
-
- @Override
- public ClusterPortStatusDTO getClusterInputPortStatus(String inputPortId) {
- final ClusterPortStatusDTO clusterInputPortStatusDto = new ClusterPortStatusDTO();
- clusterInputPortStatusDto.setNodePortStatus(new ArrayList<NodePortStatusDTO>());
-
- // set the current time
- clusterInputPortStatusDto.setStatsLastRefreshed(new Date());
-
- final Set<Node> nodes = clusterManager.getNodes(Node.Status.CONNECTED);
- boolean firstNode = true;
- for (final Node node : nodes) {
-
- final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
- if (nodeHeartbeatPayload == null) {
- continue;
- }
-
- final ProcessGroupStatus nodeStats = nodeHeartbeatPayload.getProcessGroupStatus();
- if (nodeStats == null || nodeStats.getProcessorStatus() == null) {
- continue;
- }
-
- // find the input status for this node
- final PortStatus inputPortStatus = findNodeInputPortStatus(nodeStats, inputPortId);
-
- // sanity check that we have status for this input port
- if (inputPortStatus == null) {
- throw new ResourceNotFoundException(String.format("Unable to find status for input port id '%s'.", inputPortId));
- }
-
- if (firstNode) {
- clusterInputPortStatusDto.setPortId(inputPortId);
- clusterInputPortStatusDto.setPortName(inputPortStatus.getName());
- firstNode = false;
- }
-
- // create node port status dto
- final NodePortStatusDTO nodeInputPortStatusDTO = new NodePortStatusDTO();
- clusterInputPortStatusDto.getNodePortStatus().add(nodeInputPortStatusDTO);
-
- // populate node input port status dto
- final String nodeId = node.getNodeId().getId();
- nodeInputPortStatusDTO.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId)));
- nodeInputPortStatusDTO.setPortStatus(dtoFactory.createPortStatusDto(inputPortStatus));
- }
-
- return clusterInputPortStatusDto;
- }
-
- private PortStatus findNodeOutputPortStatus(final ProcessGroupStatus groupStatus, final String outputPortId) {
- PortStatus portStatus = null;
-
- for (final PortStatus status : groupStatus.getOutputPortStatus()) {
- if (outputPortId.equals(status.getId())) {
- portStatus = status;
- break;
- }
- }
-
- if (portStatus == null) {
- for (final ProcessGroupStatus status : groupStatus.getProcessGroupStatus()) {
- portStatus = findNodeOutputPortStatus(status, outputPortId);
-
- if (portStatus != null) {
- break;
- }
- }
- }
-
- return portStatus;
- }
-
- @Override
- public ClusterPortStatusDTO getClusterOutputPortStatus(String outputPortId) {
- final ClusterPortStatusDTO clusterOutputPortStatusDto = new ClusterPortStatusDTO();
- clusterOutputPortStatusDto.setNodePortStatus(new ArrayList<NodePortStatusDTO>());
-
- // set the current time
- clusterOutputPortStatusDto.setStatsLastRefreshed(new Date());
-
- final Set<Node> nodes = clusterManager.getNodes(Node.Status.CONNECTED);
- boolean firstNode = true;
- for (final Node node : nodes) {
-
- final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
- if (nodeHeartbeatPayload == null) {
- continue;
- }
-
- final ProcessGroupStatus nodeStats = nodeHeartbeatPayload.getProcessGroupStatus();
- if (nodeStats == null || nodeStats.getProcessorStatus() == null) {
- continue;
- }
-
- // find the output status for this node
- final PortStatus outputPortStatus = findNodeOutputPortStatus(nodeStats, outputPortId);
-
- // sanity check that we have status for this output port
- if (outputPortStatus == null) {
- throw new ResourceNotFoundException(String.format("Unable to find status for output port id '%s'.", outputPortId));
- }
-
- if (firstNode) {
- clusterOutputPortStatusDto.setPortId(outputPortId);
- clusterOutputPortStatusDto.setPortName(outputPortStatus.getName());
- firstNode = false;
- }
-
- // create node port status dto
- final NodePortStatusDTO nodeOutputPortStatusDTO = new NodePortStatusDTO();
- clusterOutputPortStatusDto.getNodePortStatus().add(nodeOutputPortStatusDTO);
-
- // populate node output port status dto
- final String nodeId = node.getNodeId().getId();
- nodeOutputPortStatusDTO.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId)));
- nodeOutputPortStatusDTO.setPortStatus(dtoFactory.createPortStatusDto(outputPortStatus));
- }
-
- return clusterOutputPortStatusDto;
- }
-
- private RemoteProcessGroupStatus findNodeRemoteProcessGroupStatus(final ProcessGroupStatus groupStatus, final String remoteProcessGroupId) {
- RemoteProcessGroupStatus remoteProcessGroupStatus = null;
-
- for (final RemoteProcessGroupStatus status : groupStatus.getRemoteProcessGroupStatus()) {
- if (remoteProcessGroupId.equals(status.getId())) {
- remoteProcessGroupStatus = status;
- break;
- }
- }
-
- if (remoteProcessGroupStatus == null) {
- for (final ProcessGroupStatus status : groupStatus.getProcessGroupStatus()) {
- remoteProcessGroupStatus = findNodeRemoteProcessGroupStatus(status, remoteProcessGroupId);
-
- if (remoteProcessGroupStatus != null) {
- break;
- }
- }
- }
-
- return remoteProcessGroupStatus;
- }
-
- @Override
- public ClusterRemoteProcessGroupStatusDTO getClusterRemoteProcessGroupStatus(String remoteProcessGroupId) {
- final ClusterRemoteProcessGroupStatusDTO clusterRemoteProcessGroupStatusDto = new ClusterRemoteProcessGroupStatusDTO();
- clusterRemoteProcessGroupStatusDto.setNodeRemoteProcessGroupStatus(new ArrayList<NodeRemoteProcessGroupStatusDTO>());
-
- // set the current time
- clusterRemoteProcessGroupStatusDto.setStatsLastRefreshed(new Date());
-
- final Set<Node> nodes = clusterManager.getNodes(Node.Status.CONNECTED);
- boolean firstNode = true;
- for (final Node node : nodes) {
-
- final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
- if (nodeHeartbeatPayload == null) {
- continue;
- }
-
- final ProcessGroupStatus nodeStats = nodeHeartbeatPayload.getProcessGroupStatus();
- if (nodeStats == null || nodeStats.getProcessorStatus() == null) {
- continue;
- }
-
- // find the remote process group for this node
- final RemoteProcessGroupStatus remoteProcessGroupStatus = findNodeRemoteProcessGroupStatus(nodeStats, remoteProcessGroupId);
-
- // sanity check that we have status for this remote process group
- if (remoteProcessGroupStatus == null) {
- throw new ResourceNotFoundException(String.format("Unable to find status for remote process group id '%s'.", remoteProcessGroupId));
- }
-
- if (firstNode) {
- clusterRemoteProcessGroupStatusDto.setRemoteProcessGroupId(remoteProcessGroupId);
- clusterRemoteProcessGroupStatusDto.setRemoteProcessGroupName(remoteProcessGroupStatus.getName());
- firstNode = false;
- }
-
- // create node remote process group status dto
- final NodeRemoteProcessGroupStatusDTO nodeRemoteProcessGroupStatusDTO = new NodeRemoteProcessGroupStatusDTO();
- clusterRemoteProcessGroupStatusDto.getNodeRemoteProcessGroupStatus().add(nodeRemoteProcessGroupStatusDTO);
-
- // populate node remote process group status dto
- final String nodeId = node.getNodeId().getId();
- nodeRemoteProcessGroupStatusDTO.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId)));
- nodeRemoteProcessGroupStatusDTO.setRemoteProcessGroupStatus(dtoFactory.createRemoteProcessGroupStatusDto(remoteProcessGroupStatus));
- }
-
- return clusterRemoteProcessGroupStatusDto;
- }
-
- @Override
- public ClusterStatusHistoryDTO getClusterProcessorStatusHistory(String processorId) {
- return clusterManager.getProcessorStatusHistory(processorId);
- }
-
- @Override
- public ClusterStatusHistoryDTO getClusterConnectionStatusHistory(String connectionId) {
- return clusterManager.getConnectionStatusHistory(connectionId);
- }
-
- @Override
- public ClusterStatusHistoryDTO getClusterProcessGroupStatusHistory(String processGroupId) {
- return clusterManager.getProcessGroupStatusHistory(processGroupId);
- }
-
- @Override
- public ClusterStatusHistoryDTO getClusterRemoteProcessGroupStatusHistory(String remoteProcessGroupId) {
- return clusterManager.getRemoteProcessGroupStatusHistory(remoteProcessGroupId);
- }
-
- @Override
- public NodeStatusDTO getNodeStatus(String nodeId) {
- // find the node in question
- final Node node = clusterManager.getNode(nodeId);
-
- // verify node state
- if (node == null) {
- throw new UnknownNodeException("Node does not exist.");
- } else if (Node.Status.CONNECTED != node.getStatus()) {
- throw new IllegalClusterStateException(
- String.format("Node '%s:%s' is not connected to the cluster.",
- node.getNodeId().getApiAddress(), node.getNodeId().getApiPort()));
- }
-
- // get the node's last heartbeat
- final NodeStatusDTO nodeStatus = new NodeStatusDTO();
- final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
- if (nodeHeartbeatPayload == null) {
- return nodeStatus;
- }
-
- // get the node status
- final ProcessGroupStatus nodeProcessGroupStatus = nodeHeartbeatPayload.getProcessGroupStatus();
- if (nodeProcessGroupStatus == null) {
- return nodeStatus;
- }
-
- final ProcessGroupStatusDTO nodeProcessGroupStatusDto = dtoFactory.createProcessGroupStatusDto(clusterManager.getBulletinRepository(), nodeProcessGroupStatus);
- nodeStatus.setControllerStatus(nodeProcessGroupStatusDto);
- nodeStatus.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId)));
-
- return nodeStatus;
- }
-
- @Override
- public NodeSystemDiagnosticsDTO getNodeSystemDiagnostics(String nodeId) {
- // find the node in question
- final Node node = clusterManager.getNode(nodeId);
-
- // verify node state
- if (node == null) {
- throw new UnknownNodeException("Node does not exist.");
- } else if (Node.Status.CONNECTED != node.getStatus()) {
- throw new IllegalClusterStateException(
- String.format("Node '%s:%s' is not connected to the cluster.",
- node.getNodeId().getApiAddress(), node.getNodeId().getApiPort()));
- }
-
- // get the node's last heartbeat
- final NodeSystemDiagnosticsDTO nodeStatus = new NodeSystemDiagnosticsDTO();
- final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
- if (nodeHeartbeatPayload == null) {
- return nodeStatus;
- }
-
- // get the node status
- final SystemDiagnostics nodeSystemDiagnostics = nodeHeartbeatPayload.getSystemDiagnostics();
- if (nodeSystemDiagnostics == null) {
- return nodeStatus;
- }
-
- // populate the dto
- nodeStatus.setControllerStatus(dtoFactory.createSystemDiagnosticsDto(nodeSystemDiagnostics));
- nodeStatus.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId)));
-
- return nodeStatus;
- }
-
- @Override
- public ClusterStatusDTO getClusterStatus() {
-
- // create cluster status dto
- final ClusterStatusDTO clusterStatusDto = new ClusterStatusDTO();
-
- // populate node status dtos
- final Collection<NodeStatusDTO> nodeStatusDtos = new ArrayList<>();
- clusterStatusDto.setNodeStatus(nodeStatusDtos);
-
- for (final Node node : clusterManager.getNodes()) {
-
- if (Node.Status.CONNECTED != node.getStatus()) {
- continue;
- }
-
- final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
- if (nodeHeartbeatPayload == null) {
- continue;
- }
-
- final ProcessGroupStatus nodeProcessGroupStatus = nodeHeartbeatPayload.getProcessGroupStatus();
- if (nodeProcessGroupStatus == null) {
- continue;
- }
-
- final ProcessGroupStatusDTO nodeProcessGroupStatusDto = dtoFactory.createProcessGroupStatusDto(clusterManager.getBulletinRepository(), nodeProcessGroupStatus);
-
- // create node status dto
- final NodeStatusDTO nodeStatusDto = new NodeStatusDTO();
- nodeStatusDtos.add(nodeStatusDto);
-
- // populate the status
- nodeStatusDto.setControllerStatus(nodeProcessGroupStatusDto);
-
- // create and add node dto
- final String nodeId = node.getNodeId().getId();
- nodeStatusDto.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId)));
-
- }
-
- return clusterStatusDto;
- }
-
@Override
public ProcessorDTO getProcessor(String id) {
ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader();
@@ -3650,91 +2977,4 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return date1;
}
}
-
- /**
- * Utility method for extracting component counts from the specified group status.
- */
- private ProcessGroupCounts extractProcessGroupCounts(ProcessGroupStatus groupStatus) {
- int running = 0;
- int stopped = 0;
- int invalid = 0;
- int disabled = 0;
- int activeRemotePorts = 0;
- int inactiveRemotePorts = 0;
-
- for (final ProcessorStatus processorStatus : groupStatus.getProcessorStatus()) {
- switch (processorStatus.getRunStatus()) {
- case Disabled:
- disabled++;
- break;
- case Running:
- running++;
- break;
- case Invalid:
- invalid++;
- break;
- default:
- stopped++;
- break;
- }
- }
-
- for (final PortStatus portStatus : groupStatus.getInputPortStatus()) {
- switch (portStatus.getRunStatus()) {
- case Disabled:
- disabled++;
- break;
- case Running:
- running++;
- break;
- case Invalid:
- invalid++;
- break;
- default:
- stopped++;
- break;
- }
- }
-
- for (final PortStatus portStatus : groupStatus.getOutputPortStatus()) {
- switch (portStatus.getRunStatus()) {
- case Disabled:
- disabled++;
- break;
- case Running:
- running++;
- break;
- case Invalid:
- invalid++;
- break;
- default:
- stopped++;
- break;
- }
- }
-
- for (final RemoteProcessGroupStatus remoteStatus : groupStatus.getRemoteProcessGroupStatus()) {
- if (remoteStatus.getActiveRemotePortCount() != null) {
- activeRemotePorts += remoteStatus.getActiveRemotePortCount();
- }
- if (remoteStatus.getInactiveRemotePortCount() != null) {
- inactiveRemotePorts += remoteStatus.getInactiveRemotePortCount();
- }
- if (CollectionUtils.isNotEmpty(remoteStatus.getAuthorizationIssues())) {
- invalid++;
- }
- }
-
- for (final ProcessGroupStatus childGroupStatus : groupStatus.getProcessGroupStatus()) {
- final ProcessGroupCounts childCounts = extractProcessGroupCounts(childGroupStatus);
- running += childCounts.getRunningCount();
- stopped += childCounts.getStoppedCount();
- invalid += childCounts.getInvalidCount();
- disabled += childCounts.getDisabledCount();
- activeRemotePorts += childCounts.getActiveRemotePortCount();
- inactiveRemotePorts += childCounts.getInactiveRemotePortCount();
- }
-
- return new ProcessGroupCounts(0, 0, running, stopped, invalid, disabled, activeRemotePorts, inactiveRemotePorts);
- }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
index 6f895b8..3b429e7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
@@ -79,6 +79,8 @@ public abstract class ApplicationResource {
private static final int CLUSTER_CONTEXT_HEADER_VALUE_MAX_BYTES = (int) (0.75 * HEADER_BUFFER_SIZE);
private static final Logger logger = LoggerFactory.getLogger(ApplicationResource.class);
+ public static final String NODEWISE = "false";
+
@Context
private HttpServletRequest httpServletRequest;
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/BulletinBoardResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/BulletinBoardResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/BulletinBoardResource.java
index 6197953..d13b5c9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/BulletinBoardResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/BulletinBoardResource.java
@@ -16,20 +16,19 @@
*/
package org.apache.nifi.web.api;
-import com.wordnik.swagger.annotations.Api;
-import com.wordnik.swagger.annotations.ApiOperation;
-import com.wordnik.swagger.annotations.ApiParam;
-import com.wordnik.swagger.annotations.ApiResponse;
-import com.wordnik.swagger.annotations.ApiResponses;
-import com.wordnik.swagger.annotations.Authorization;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.api.dto.BulletinBoardDTO;
import org.apache.nifi.web.api.dto.BulletinQueryDTO;
@@ -39,18 +38,23 @@ import org.apache.nifi.web.api.request.BulletinBoardPatternParameter;
import org.apache.nifi.web.api.request.ClientIdParameter;
import org.apache.nifi.web.api.request.IntegerParameter;
import org.apache.nifi.web.api.request.LongParameter;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.security.access.prepost.PreAuthorize;
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
+
/**
* RESTful endpoint for managing a Template.
*/
@Api(hidden = true)
public class BulletinBoardResource extends ApplicationResource {
- private static final Logger logger = LoggerFactory.getLogger(BulletinBoardResource.class);
+ private NiFiProperties properties;
+ private WebClusterManager clusterManager;
private NiFiServiceFacade serviceFacade;
@@ -128,6 +132,11 @@ public class BulletinBoardResource extends ApplicationResource {
)
@QueryParam("limit") IntegerParameter limit) {
+ // replicate if cluster manager
+ if (properties.isClusterManager()) {
+ return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+ }
+
// build the bulletin query
final BulletinQueryDTO query = new BulletinQueryDTO();
@@ -171,4 +180,11 @@ public class BulletinBoardResource extends ApplicationResource {
this.serviceFacade = serviceFacade;
}
+ public void setClusterManager(WebClusterManager clusterManager) {
+ this.clusterManager = clusterManager;
+ }
+
+ public void setProperties(NiFiProperties properties) {
+ this.properties = properties;
+ }
}