You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/04/04 18:28:47 UTC
[11/18] nifi git commit: NIFI-1563: - Federate requests and merge
responses from nodes instead of storing bulletins and stats at NCM - Updating
UI to support restructured status history DTO. - Return 'Insufficient
History' message if aggregate stats don'
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index c0f4c63..303e98e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -29,8 +29,6 @@ import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
@@ -73,7 +71,6 @@ import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
-import org.apache.nifi.cluster.BulletinsPayload;
import org.apache.nifi.cluster.HeartbeatPayload;
import org.apache.nifi.cluster.context.ClusterContext;
import org.apache.nifi.cluster.context.ClusterContextImpl;
@@ -88,6 +85,7 @@ import org.apache.nifi.cluster.manager.HttpClusterManager;
import org.apache.nifi.cluster.manager.HttpRequestReplicator;
import org.apache.nifi.cluster.manager.HttpResponseMapper;
import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.StatusMerger;
import org.apache.nifi.cluster.manager.exception.ConflictingNodeIdException;
import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException;
@@ -109,7 +107,6 @@ import org.apache.nifi.cluster.node.Node.Status;
import org.apache.nifi.cluster.protocol.ConnectionRequest;
import org.apache.nifi.cluster.protocol.ConnectionResponse;
import org.apache.nifi.cluster.protocol.Heartbeat;
-import org.apache.nifi.cluster.protocol.NodeBulletins;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
@@ -121,7 +118,6 @@ import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
@@ -130,7 +126,6 @@ import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.controller.ControllerService;
-import org.apache.nifi.controller.Heartbeater;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.StandardFlowSerializer;
@@ -153,16 +148,14 @@ import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.controller.service.StandardControllerServiceProvider;
import org.apache.nifi.controller.state.SortedStateUtils;
import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
-import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
-import org.apache.nifi.controller.status.history.ComponentStatusRepository;
+import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
import org.apache.nifi.controller.status.history.MetricDescriptor;
-import org.apache.nifi.controller.status.history.StatusHistory;
+import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor;
+import org.apache.nifi.controller.status.history.ProcessorStatusDescriptor;
+import org.apache.nifi.controller.status.history.RemoteProcessGroupStatusDescriptor;
+import org.apache.nifi.controller.status.history.StandardStatusSnapshot;
import org.apache.nifi.controller.status.history.StatusHistoryUtil;
import org.apache.nifi.controller.status.history.StatusSnapshot;
-import org.apache.nifi.diagnostics.GarbageCollection;
-import org.apache.nifi.diagnostics.StorageUsage;
-import org.apache.nifi.diagnostics.SystemDiagnostics;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.BulletinFactory;
@@ -178,7 +171,6 @@ import org.apache.nifi.logging.NiFiLog;
import org.apache.nifi.logging.ReportingTaskLogObserver;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
-import org.apache.nifi.nar.NarThreadContextClassLoader;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.processor.StandardValidationContextFactory;
import org.apache.nifi.remote.RemoteResourceManager;
@@ -188,7 +180,9 @@ import org.apache.nifi.remote.cluster.ClusterNodeInformation;
import org.apache.nifi.remote.cluster.NodeInformation;
import org.apache.nifi.remote.protocol.socket.ClusterManagerServerProtocol;
import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinQuery;
import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.reporting.ReportingTask;
@@ -202,14 +196,18 @@ import org.apache.nifi.util.ReflectionUtils;
import org.apache.nifi.web.OptimisticLockingManager;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.UpdateRevision;
+import org.apache.nifi.web.api.dto.BulletinBoardDTO;
+import org.apache.nifi.web.api.dto.BulletinDTO;
import org.apache.nifi.web.api.dto.ComponentStateDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
+import org.apache.nifi.web.api.dto.CountersDTO;
import org.apache.nifi.web.api.dto.DropRequestDTO;
import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.ListingRequestDTO;
-import org.apache.nifi.web.api.dto.NodeDTO;
+import org.apache.nifi.web.api.dto.NodeCountersSnapshotDTO;
+import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsSnapshotDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.QueueSizeDTO;
@@ -219,30 +217,53 @@ import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
import org.apache.nifi.web.api.dto.ReportingTaskDTO;
import org.apache.nifi.web.api.dto.StateEntryDTO;
import org.apache.nifi.web.api.dto.StateMapDTO;
+import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceResultsDTO;
-import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO;
-import org.apache.nifi.web.api.dto.status.NodeStatusHistoryDTO;
+import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
+import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
+import org.apache.nifi.web.api.dto.status.NodeConnectionStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.NodePortStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.NodeProcessorStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.NodeRemoteProcessGroupStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.NodeStatusSnapshotsDTO;
+import org.apache.nifi.web.api.dto.status.PortStatusDTO;
+import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
+import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
+import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
+import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
+import org.apache.nifi.web.api.entity.BulletinBoardEntity;
import org.apache.nifi.web.api.entity.ComponentStateEntity;
+import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
import org.apache.nifi.web.api.entity.ControllerServicesEntity;
+import org.apache.nifi.web.api.entity.ControllerStatusEntity;
+import org.apache.nifi.web.api.entity.CountersEntity;
import org.apache.nifi.web.api.entity.DropRequestEntity;
import org.apache.nifi.web.api.entity.FlowSnippetEntity;
import org.apache.nifi.web.api.entity.ListingRequestEntity;
+import org.apache.nifi.web.api.entity.PortStatusEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
+import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.apache.nifi.web.api.entity.ProcessorStatusEntity;
import org.apache.nifi.web.api.entity.ProcessorsEntity;
import org.apache.nifi.web.api.entity.ProvenanceEntity;
import org.apache.nifi.web.api.entity.ProvenanceEventEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
+import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity;
import org.apache.nifi.web.api.entity.ReportingTaskEntity;
import org.apache.nifi.web.api.entity.ReportingTasksEntity;
+import org.apache.nifi.web.api.entity.StatusHistoryEntity;
+import org.apache.nifi.web.api.entity.SystemDiagnosticsEntity;
import org.apache.nifi.web.util.WebUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -310,10 +331,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
*/
private static final int DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS = 5;
- public static final String DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION = "org.apache.nifi.controller.status.history.VolatileComponentStatusRepository";
public static final Pattern PROCESSORS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors");
public static final Pattern PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}");
+ public static final Pattern PROCESSOR_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}/status");
public static final Pattern PROCESSOR_STATE_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}/state");
public static final Pattern CLUSTER_PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/cluster/processors/[a-f0-9\\-]{36}");
@@ -321,6 +342,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
public static final Pattern REMOTE_PROCESS_GROUP_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups/[a-f0-9\\-]{36}");
public static final Pattern PROCESS_GROUP_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))");
+ public static final Pattern GROUP_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/status");
+ public static final Pattern CONTROLLER_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/controller/status");
public static final Pattern TEMPLATE_INSTANCE_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/template-instance");
public static final Pattern FLOW_SNIPPET_INSTANCE_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/snippet-instance");
@@ -328,7 +351,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
public static final Pattern PROVENANCE_QUERY_URI = Pattern.compile("/nifi-api/controller/provenance/[a-f0-9\\-]{36}");
public static final Pattern PROVENANCE_EVENT_URI = Pattern.compile("/nifi-api/controller/provenance/events/[0-9]+");
- public static final Pattern COUNTERS_URI = Pattern.compile("/nifi-api/controller/counters/[a-f0-9\\-]{36}");
public static final String CONTROLLER_SERVICES_URI = "/nifi-api/controller/controller-services/node";
public static final Pattern CONTROLLER_SERVICE_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}");
public static final Pattern CONTROLLER_SERVICE_STATE_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}/state");
@@ -336,6 +358,24 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node";
public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}");
public static final Pattern REPORTING_TASK_STATE_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}/state");
+ public static final Pattern BULLETIN_BOARD_URI_PATTERN = Pattern.compile("/nifi-api/controller/bulletin-board");
+ public static final Pattern SYSTEM_DIAGNOSTICS_URI_PATTERN = Pattern.compile("/nifi-api/system-diagnostics");
+ public static final Pattern COUNTERS_URI_PATTERN = Pattern.compile("/nifi-api/controller/counters");
+ public static final Pattern COUNTER_URI_PATTERN = Pattern.compile("/nifi-api/controller/counters/[a-f0-9\\-]{36}");
+
+ public static final Pattern PROCESSOR_STATUS_HISTORY_URI_PATTERN =
+ Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}/status/history");
+ public static final Pattern PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/status/history");
+ public static final Pattern REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN = Pattern
+ .compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups/[a-f0-9\\-]{36}/status/history");
+ public static final Pattern CONNECTION_STATUS_HISTORY_URI_PATTERN = Pattern
+ .compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/status/history");
+
+ public static final Pattern CONNECTION_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/status");
+ public static final Pattern INPUT_PORT_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/input-ports/[a-f0-9\\-]{36}/status");
+ public static final Pattern OUTPUT_PORT_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/output-ports/[a-f0-9\\-]{36}/status");
+ public static final Pattern REMOTE_PROCESS_GROUP_STATUS_URI_PATTERN =
+ Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups/[a-f0-9\\-]{36}/status");
@Deprecated
public static final Pattern QUEUE_CONTENTS_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/contents");
@@ -378,7 +418,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
private final BulletinRepository bulletinRepository;
private final String instanceId;
private final FlowEngine reportingTaskEngine;
- private final Map<NodeIdentifier, ComponentStatusRepository> componentMetricsRepositoryMap = new HashMap<>();
private final StandardProcessScheduler processScheduler;
private final StateManagerProvider stateManagerProvider;
private final long componentStatusSnapshotMillis;
@@ -451,11 +490,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
throw new RuntimeException(e);
}
- processScheduler = new StandardProcessScheduler(new Heartbeater() {
- @Override
- public void heartbeat() {
- }
- }, this, encryptor, stateManagerProvider);
+ processScheduler = new StandardProcessScheduler(this, encryptor, stateManagerProvider);
// When we construct the scheduling agents, we can pass null for a lot of the arguments because we are only
// going to be scheduling Reporting Tasks. Otherwise, it would not be okay.
@@ -463,7 +498,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, new QuartzSchedulingAgent(null, reportingTaskEngine, null, encryptor));
processScheduler.setMaxThreadCount(SchedulingStrategy.TIMER_DRIVEN, 10);
processScheduler.setMaxThreadCount(SchedulingStrategy.CRON_DRIVEN, 10);
- processScheduler.scheduleFrameworkTask(new CaptureComponentMetrics(), "Capture Component Metrics", componentStatusSnapshotMillis, componentStatusSnapshotMillis, TimeUnit.MILLISECONDS);
controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, stateManagerProvider);
}
@@ -620,7 +654,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
return MessageType.CONNECTION_REQUEST == msg.getType()
|| MessageType.HEARTBEAT == msg.getType()
|| MessageType.CONTROLLER_STARTUP_FAILURE == msg.getType()
- || MessageType.BULLETINS == msg.getType()
|| MessageType.RECONNECTION_FAILURE == msg.getType();
}
@@ -654,10 +687,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
}, "Handle Reconnection Failure Message from " + ((ReconnectionFailureMessage) protocolMessage).getNodeId()).start();
return null;
- case BULLETINS:
- final NodeBulletinsMessage bulletinsMessage = (NodeBulletinsMessage) protocolMessage;
- handleBulletins(bulletinsMessage.getBulletins());
- return null;
default:
throw new ProtocolException("No handler defined for message type: " + protocolMessage.getType());
}
@@ -1686,22 +1715,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
processScheduler.enableReportingTask(reportingTask);
}
- /**
- * Handle a bulletins message.
- *
- * @param bulletins bulletins
- */
- public void handleBulletins(final NodeBulletins bulletins) {
- final NodeIdentifier nodeIdentifier = bulletins.getNodeIdentifier();
- final String nodeAddress = nodeIdentifier.getApiAddress() + ":" + nodeIdentifier.getApiPort();
-
- // unmarshal the message
- final BulletinsPayload payload = BulletinsPayload.unmarshal(bulletins.getPayload());
- for (final Bulletin bulletin : payload.getBulletins()) {
- bulletin.setNodeAddress(nodeAddress);
- bulletinRepository.addBulletin(bulletin);
- }
- }
/**
* Handles a node's heartbeat. If this heartbeat is a node's first heartbeat since its connection request, then the manager will mark the node as connected. If the node was previously disconnected
@@ -1875,20 +1888,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
- private ComponentStatusRepository createComponentStatusRepository() {
- final String implementationClassName = properties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION, DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION);
- if (implementationClassName == null) {
- throw new RuntimeException("Cannot create Component Status Repository because the NiFi Properties is missing the following property: "
- + NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
- }
-
- try {
- return NarThreadContextClassLoader.createInstance(implementationClassName, ComponentStatusRepository.class);
- } catch (final Exception e) {
- throw new RuntimeException(e);
- }
- }
-
@Override
public Set<Node> getNodes(final Status... statuses) {
final Set<Status> desiredStatusSet = new HashSet<>();
@@ -2434,6 +2433,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
return false;
}
+ private static boolean isProcessorStatusEndpoint(final URI uri, final String method) {
+ return "GET".equalsIgnoreCase(method) && PROCESSOR_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
+ }
+
private static boolean isProcessorStateEndpoint(final URI uri, final String method) {
return "GET".equalsIgnoreCase(method) && PROCESSOR_STATE_URI_PATTERN.matcher(uri.getPath()).matches();
}
@@ -2442,6 +2445,30 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
return ("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && PROCESS_GROUP_URI_PATTERN.matcher(uri.getPath()).matches();
}
+ private static boolean isConnectionStatusEndpoint(final URI uri, final String method) {
+ return "GET".equalsIgnoreCase(method) && CONNECTION_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
+ }
+
+ private static boolean isInputPortStatusEndpoint(final URI uri, final String method) {
+ return "GET".equalsIgnoreCase(method) && INPUT_PORT_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
+ }
+
+ private static boolean isOutputPortStatusEndpoint(final URI uri, final String method) {
+ return "GET".equalsIgnoreCase(method) && OUTPUT_PORT_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
+ }
+
+ private static boolean isRemoteProcessGroupStatusEndpoint(final URI uri, final String method) {
+ return "GET".equalsIgnoreCase(method) && REMOTE_PROCESS_GROUP_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
+ }
+
+ private static boolean isGroupStatusEndpoint(final URI uri, final String method) {
+ return "GET".equalsIgnoreCase(method) && GROUP_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
+ }
+
+ private static boolean isControllerStatusEndpoint(final URI uri, final String method) {
+ return "GET".equalsIgnoreCase(method) && CONTROLLER_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
+ }
+
private static boolean isTemplateEndpoint(final URI uri, final String method) {
return "POST".equalsIgnoreCase(method) && TEMPLATE_INSTANCE_URI_PATTERN.matcher(uri.getPath()).matches();
}
@@ -2454,6 +2481,35 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
return "GET".equalsIgnoreCase(method) && REMOTE_PROCESS_GROUPS_URI_PATTERN.matcher(uri.getPath()).matches();
}
+ private static boolean isProcessorStatusHistoryEndpoint(final URI uri, final String method) {
+ return "GET".equalsIgnoreCase(method) && PROCESSOR_STATUS_HISTORY_URI_PATTERN.matcher(uri.getPath()).matches();
+ }
+
+ private static boolean isProcessGroupStatusHistoryEndpoint(final URI uri, final String method) {
+ return "GET".equalsIgnoreCase(method) && PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN.matcher(uri.getPath()).matches();
+ }
+
+ private static boolean isRemoteProcessGroupStatusHistoryEndpoint(final URI uri, final String method) {
+ return "GET".equalsIgnoreCase(method) && REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN.matcher(uri.getPath()).matches();
+ }
+
+ private static boolean isConnectionStatusHistoryEndpoint(final URI uri, final String method) {
+ return "GET".equalsIgnoreCase(method) && CONNECTION_STATUS_HISTORY_URI_PATTERN.matcher(uri.getPath()).matches();
+ }
+
+ private static boolean isBulletinBoardEndpoint(final URI uri, final String method) {
+ return "GET".equalsIgnoreCase(method) && BULLETIN_BOARD_URI_PATTERN.matcher(uri.getPath()).matches();
+ }
+
+ private static boolean isSystemDiagnosticsEndpoint(final URI uri, final String method) {
+ return "GET".equalsIgnoreCase(method) && SYSTEM_DIAGNOSTICS_URI_PATTERN.matcher(uri.getPath()).matches();
+ }
+
+ private static boolean isCountersEndpoint(final URI uri, final String method) {
+ return "GET".equalsIgnoreCase(method) && COUNTERS_URI_PATTERN.matcher(uri.getPath()).matches();
+ }
+
+
private static boolean isRemoteProcessGroupEndpoint(final URI uri, final String method) {
if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && REMOTE_PROCESS_GROUP_URI_PATTERN.matcher(uri.getPath()).matches()) {
return true;
@@ -2487,8 +2543,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
return false;
}
- private static boolean isCountersEndpoint(final URI uri) {
- return COUNTERS_URI.matcher(uri.getPath()).matches();
+ private static boolean isCounterEndpoint(final URI uri) {
+ return COUNTER_URI_PATTERN.matcher(uri.getPath()).matches();
}
private static boolean isControllerServicesEndpoint(final URI uri, final String method) {
@@ -2556,7 +2612,14 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|| isControllerServicesEndpoint(uri, method) || isControllerServiceEndpoint(uri, method)
|| isControllerServiceReferenceEndpoint(uri, method) || isControllerServiceStateEndpoint(uri, method)
|| isReportingTasksEndpoint(uri, method) || isReportingTaskEndpoint(uri, method) || isReportingTaskStateEndpoint(uri, method)
- || isDropRequestEndpoint(uri, method) || isListFlowFilesEndpoint(uri, method);
+ || isDropRequestEndpoint(uri, method) || isListFlowFilesEndpoint(uri, method)
+ || isGroupStatusEndpoint(uri, method) || isProcessorStatusEndpoint(uri, method) || isControllerStatusEndpoint(uri, method)
+ || isConnectionStatusEndpoint(uri, method) || isRemoteProcessGroupStatusEndpoint(uri, method)
+ || isInputPortStatusEndpoint(uri, method) || isOutputPortStatusEndpoint(uri, method)
+ || isProcessorStatusHistoryEndpoint(uri, method) || isProcessGroupStatusHistoryEndpoint(uri, method)
+ || isRemoteProcessGroupStatusHistoryEndpoint(uri, method) || isConnectionStatusHistoryEndpoint(uri, method)
+ || isBulletinBoardEndpoint(uri, method) || isSystemDiagnosticsEndpoint(uri, method)
+ || isCountersEndpoint(uri, method);
}
private void mergeProcessorValidationErrors(final ProcessorDTO processor, Map<NodeIdentifier, ProcessorDTO> processorMap) {
@@ -2608,6 +2671,303 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
componentState.getLocalState().setState(localStateEntries);
}
+
+ private void mergeSystemDiagnostics(final SystemDiagnosticsDTO target, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, SystemDiagnosticsDTO> resultMap) {
+ final SystemDiagnosticsDTO mergedSystemDiagnostics = target;
+ mergedSystemDiagnostics.setNodeSnapshots(new ArrayList<NodeSystemDiagnosticsSnapshotDTO>());
+
+ final NodeSystemDiagnosticsSnapshotDTO selectedNodeSnapshot = new NodeSystemDiagnosticsSnapshotDTO();
+ selectedNodeSnapshot.setSnapshot(target.getAggregateSnapshot().clone());
+ selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
+ selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
+ selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
+
+ mergedSystemDiagnostics.getNodeSnapshots().add(selectedNodeSnapshot);
+
+ for (final Map.Entry<NodeIdentifier, SystemDiagnosticsDTO> entry : resultMap.entrySet()) {
+ final NodeIdentifier nodeId = entry.getKey();
+ final SystemDiagnosticsDTO toMerge = entry.getValue();
+ if (toMerge == target) {
+ continue;
+ }
+
+ StatusMerger.merge(mergedSystemDiagnostics, toMerge, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort());
+ }
+ }
+
+ private void mergeCounters(final CountersDTO target, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, CountersDTO> resultMap) {
+ final CountersDTO mergedCounters = target;
+ mergedCounters.setNodeSnapshots(new ArrayList<NodeCountersSnapshotDTO>());
+
+ final NodeCountersSnapshotDTO selectedNodeSnapshot = new NodeCountersSnapshotDTO();
+ selectedNodeSnapshot.setSnapshot(target.getAggregateSnapshot().clone());
+ selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
+ selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
+ selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
+
+ mergedCounters.getNodeSnapshots().add(selectedNodeSnapshot);
+
+ for (final Map.Entry<NodeIdentifier, CountersDTO> entry : resultMap.entrySet()) {
+ final NodeIdentifier nodeId = entry.getKey();
+ final CountersDTO toMerge = entry.getValue();
+ if (toMerge == target) {
+ continue;
+ }
+
+ StatusMerger.merge(mergedCounters, toMerge, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort());
+ }
+ }
+
+ private void mergeGroupStatus(final ProcessGroupStatusDTO statusDto, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, ProcessGroupStatusDTO> resultMap) {
+ final ProcessGroupStatusDTO mergedProcessGroupStatus = statusDto;
+ mergedProcessGroupStatus.setNodeSnapshots(new ArrayList<NodeProcessGroupStatusSnapshotDTO>());
+
+ final NodeProcessGroupStatusSnapshotDTO selectedNodeSnapshot = new NodeProcessGroupStatusSnapshotDTO();
+ selectedNodeSnapshot.setStatusSnapshot(statusDto.getAggregateSnapshot().clone());
+ selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
+ selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
+ selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
+
+ mergedProcessGroupStatus.getNodeSnapshots().add(selectedNodeSnapshot);
+
+ for (final Map.Entry<NodeIdentifier, ProcessGroupStatusDTO> entry : resultMap.entrySet()) {
+ final NodeIdentifier nodeId = entry.getKey();
+ final ProcessGroupStatusDTO nodeProcessGroupStatus = entry.getValue();
+ if (nodeProcessGroupStatus == mergedProcessGroupStatus) {
+ continue;
+ }
+
+ final ProcessGroupStatusSnapshotDTO nodeSnapshot = nodeProcessGroupStatus.getAggregateSnapshot();
+ for (final RemoteProcessGroupStatusSnapshotDTO remoteProcessGroupStatus : nodeSnapshot.getRemoteProcessGroupStatusSnapshots()) {
+ final List<String> nodeAuthorizationIssues = remoteProcessGroupStatus.getAuthorizationIssues();
+ if (!nodeAuthorizationIssues.isEmpty()) {
+ for (final ListIterator<String> iter = nodeAuthorizationIssues.listIterator(); iter.hasNext();) {
+ final String Issue = iter.next();
+ iter.set("[" + nodeId.getApiAddress() + ":" + nodeId.getApiPort() + "] -- " + Issue);
+ }
+ remoteProcessGroupStatus.setAuthorizationIssues(nodeAuthorizationIssues);
+ }
+ }
+
+ StatusMerger.merge(mergedProcessGroupStatus, nodeProcessGroupStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort());
+ }
+ }
+
+
+ private void mergeProcessorStatus(final ProcessorStatusDTO statusDto, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, ProcessorStatusDTO> resultMap) {
+ final ProcessorStatusDTO mergedProcessorStatus = statusDto;
+ mergedProcessorStatus.setNodeSnapshots(new ArrayList<NodeProcessorStatusSnapshotDTO>());
+
+ final NodeProcessorStatusSnapshotDTO selectedNodeSnapshot = new NodeProcessorStatusSnapshotDTO();
+ selectedNodeSnapshot.setStatusSnapshot(statusDto.getAggregateSnapshot().clone());
+ selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
+ selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
+ selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
+
+ mergedProcessorStatus.getNodeSnapshots().add(selectedNodeSnapshot);
+
+ // merge the other nodes
+ for (final Map.Entry<NodeIdentifier, ProcessorStatusDTO> entry : resultMap.entrySet()) {
+ final NodeIdentifier nodeId = entry.getKey();
+ final ProcessorStatusDTO nodeProcessorStatus = entry.getValue();
+ if (nodeProcessorStatus == statusDto) {
+ continue;
+ }
+
+ StatusMerger.merge(mergedProcessorStatus, nodeProcessorStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort());
+ }
+ }
+
+ private void mergeConnectionStatus(final ConnectionStatusDTO statusDto, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, ConnectionStatusDTO> resultMap) {
+ final ConnectionStatusDTO mergedConnectionStatus = statusDto;
+ mergedConnectionStatus.setNodeSnapshots(new ArrayList<NodeConnectionStatusSnapshotDTO>());
+
+ final NodeConnectionStatusSnapshotDTO selectedNodeSnapshot = new NodeConnectionStatusSnapshotDTO();
+ selectedNodeSnapshot.setStatusSnapshot(statusDto.getAggregateSnapshot().clone());
+ selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
+ selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
+ selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
+
+ mergedConnectionStatus.getNodeSnapshots().add(selectedNodeSnapshot);
+
+ // merge the other nodes
+ for (final Map.Entry<NodeIdentifier, ConnectionStatusDTO> entry : resultMap.entrySet()) {
+ final NodeIdentifier nodeId = entry.getKey();
+ final ConnectionStatusDTO nodeConnectionStatus = entry.getValue();
+ if (nodeConnectionStatus == statusDto) {
+ continue;
+ }
+
+ StatusMerger.merge(mergedConnectionStatus, nodeConnectionStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort());
+ }
+ }
+
+ private void mergePortStatus(final PortStatusDTO statusDto, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, PortStatusDTO> resultMap) {
+ final PortStatusDTO mergedPortStatus = statusDto;
+ mergedPortStatus.setNodeSnapshots(new ArrayList<NodePortStatusSnapshotDTO>());
+
+ final NodePortStatusSnapshotDTO selectedNodeSnapshot = new NodePortStatusSnapshotDTO();
+ selectedNodeSnapshot.setStatusSnapshot(statusDto.getAggregateSnapshot().clone());
+ selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
+ selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
+ selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
+
+ mergedPortStatus.getNodeSnapshots().add(selectedNodeSnapshot);
+
+ // merge the other nodes
+ for (final Map.Entry<NodeIdentifier, PortStatusDTO> entry : resultMap.entrySet()) {
+ final NodeIdentifier nodeId = entry.getKey();
+ final PortStatusDTO nodePortStatus = entry.getValue();
+ if (nodePortStatus == statusDto) {
+ continue;
+ }
+
+ StatusMerger.merge(mergedPortStatus, nodePortStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort());
+ }
+ }
+
+ private void mergeRemoteProcessGroupStatus(final RemoteProcessGroupStatusDTO statusDto, final NodeIdentifier selectedNodeId, final Map<NodeIdentifier, RemoteProcessGroupStatusDTO> resultMap) {
+ final RemoteProcessGroupStatusDTO mergedRemoteProcessGroupStatus = statusDto;
+ mergedRemoteProcessGroupStatus.setNodeSnapshots(new ArrayList<NodeRemoteProcessGroupStatusSnapshotDTO>());
+
+ final NodeRemoteProcessGroupStatusSnapshotDTO selectedNodeSnapshot = new NodeRemoteProcessGroupStatusSnapshotDTO();
+ selectedNodeSnapshot.setStatusSnapshot(statusDto.getAggregateSnapshot().clone());
+ selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
+ selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
+ selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
+
+ mergedRemoteProcessGroupStatus.getNodeSnapshots().add(selectedNodeSnapshot);
+
+ // merge the other nodes
+ for (final Map.Entry<NodeIdentifier, RemoteProcessGroupStatusDTO> entry : resultMap.entrySet()) {
+ final NodeIdentifier nodeId = entry.getKey();
+ final RemoteProcessGroupStatusDTO nodeRemoteProcessGroupStatus = entry.getValue();
+ if (nodeRemoteProcessGroupStatus == statusDto) {
+ continue;
+ }
+
+ StatusMerger.merge(mergedRemoteProcessGroupStatus, nodeRemoteProcessGroupStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort());
+ }
+ }
+
+ private void mergeControllerStatus(final ControllerStatusDTO statusDto, final Map<NodeIdentifier, ControllerStatusDTO> resultMap) {
+ ControllerStatusDTO mergedStatus = statusDto;
+ for (final Map.Entry<NodeIdentifier, ControllerStatusDTO> entry : resultMap.entrySet()) {
+ final NodeIdentifier nodeId = entry.getKey();
+ final ControllerStatusDTO nodeStatus = entry.getValue();
+
+ final String nodeAddress = nodeId.getApiAddress() + ":" + nodeId.getApiPort();
+ for (final BulletinDTO bulletin : nodeStatus.getBulletins()) {
+ bulletin.setNodeAddress(nodeAddress);
+ }
+ for (final BulletinDTO bulletin : nodeStatus.getControllerServiceBulletins()) {
+ bulletin.setNodeAddress(nodeAddress);
+ }
+ for (final BulletinDTO bulletin : nodeStatus.getReportingTaskBulletins()) {
+ bulletin.setNodeAddress(nodeAddress);
+ }
+
+ if (nodeStatus == mergedStatus) {
+ continue;
+ }
+
+ StatusMerger.merge(mergedStatus, nodeStatus);
+ }
+
+ final int totalNodeCount = getNodeIds().size();
+ final int connectedNodeCount = getNodeIds(Status.CONNECTED).size();
+
+ final List<Bulletin> ncmControllerBulletins = getBulletinRepository().findBulletinsForController();
+ mergedStatus.setBulletins(mergeNCMBulletins(mergedStatus.getBulletins(), ncmControllerBulletins));
+
+ // get the controller service bulletins
+ final BulletinQuery controllerServiceQuery = new BulletinQuery.Builder().sourceType(ComponentType.CONTROLLER_SERVICE).build();
+ final List<Bulletin> ncmServiceBulletins = getBulletinRepository().findBulletins(controllerServiceQuery);
+ mergedStatus.setControllerServiceBulletins(mergeNCMBulletins(mergedStatus.getControllerServiceBulletins(), ncmServiceBulletins));
+
+ // get the reporting task bulletins
+ final BulletinQuery reportingTaskQuery = new BulletinQuery.Builder().sourceType(ComponentType.REPORTING_TASK).build();
+ final List<Bulletin> ncmReportingTaskBulletins = getBulletinRepository().findBulletins(reportingTaskQuery);
+ mergedStatus.setReportingTaskBulletins(mergeNCMBulletins(mergedStatus.getReportingTaskBulletins(), ncmReportingTaskBulletins));
+
+ mergedStatus.setConnectedNodeCount(connectedNodeCount);
+ mergedStatus.setTotalNodeCount(totalNodeCount);
+ StatusMerger.updatePrettyPrintedFields(mergedStatus);
+ }
+
+ private List<BulletinDTO> mergeNCMBulletins(final List<BulletinDTO> nodeBulletins, final List<Bulletin> ncmBulletins) {
+ if (ncmBulletins == null || ncmBulletins.isEmpty()) {
+ return nodeBulletins;
+ }
+
+ final List<BulletinDTO> mergedBulletins = new ArrayList<>(nodeBulletins.size() + ncmBulletins.size());
+ mergedBulletins.addAll(nodeBulletins);
+ mergedBulletins.addAll(createBulletinDtos(ncmBulletins));
+ return mergedBulletins;
+ }
+
+ private void mergeBulletinBoard(final BulletinBoardDTO nodeBulletinBoard, final Map<NodeIdentifier, BulletinBoardDTO> resultMap) {
+ final List<BulletinDTO> bulletinDtos = new ArrayList<>();
+ for (final Map.Entry<NodeIdentifier, BulletinBoardDTO> entry : resultMap.entrySet()) {
+ final NodeIdentifier nodeId = entry.getKey();
+ final BulletinBoardDTO boardDto = entry.getValue();
+ final String nodeAddress = nodeId.getApiAddress() + ":" + nodeId.getApiPort();
+
+ for (final BulletinDTO bulletin : boardDto.getBulletins()) {
+ bulletin.setNodeAddress(nodeAddress);
+ bulletinDtos.add(bulletin);
+ }
+ }
+
+ Collections.sort(bulletinDtos, new Comparator<BulletinDTO>() {
+ @Override
+ public int compare(final BulletinDTO o1, final BulletinDTO o2) {
+ final int timeComparison = o1.getTimestamp().compareTo(o2.getTimestamp());
+ if (timeComparison != 0) {
+ return timeComparison;
+ }
+
+ return o1.getNodeAddress().compareTo(o2.getNodeAddress());
+ }
+ });
+
+ nodeBulletinBoard.setBulletins(bulletinDtos);
+ }
+
+ /**
+ * Creates BulletinDTOs for the specified Bulletins.
+ *
+ * @param bulletins bulletin
+ * @return dto
+ */
+ public List<BulletinDTO> createBulletinDtos(final List<Bulletin> bulletins) {
+ final List<BulletinDTO> bulletinDtos = new ArrayList<>(bulletins.size());
+ for (final Bulletin bulletin : bulletins) {
+ bulletinDtos.add(createBulletinDto(bulletin));
+ }
+ return bulletinDtos;
+ }
+
+ /**
+ * Creates a BulletinDTO for the specified Bulletin.
+ *
+ * @param bulletin bulletin
+ * @return dto
+ */
+ public BulletinDTO createBulletinDto(final Bulletin bulletin) {
+ final BulletinDTO dto = new BulletinDTO();
+ dto.setId(bulletin.getId());
+ dto.setNodeAddress(bulletin.getNodeAddress());
+ dto.setTimestamp(bulletin.getTimestamp());
+ dto.setGroupId(bulletin.getGroupId());
+ dto.setSourceId(bulletin.getSourceId());
+ dto.setSourceName(bulletin.getSourceName());
+ dto.setCategory(bulletin.getCategory());
+ dto.setLevel(bulletin.getLevel());
+ dto.setMessage(bulletin.getMessage());
+ return dto;
+ }
+
private void mergeProvenanceQueryResults(final ProvenanceDTO provenanceDto, final Map<NodeIdentifier, ProvenanceDTO> resultMap, final Set<NodeResponse> problematicResponses) {
final ProvenanceResultsDTO results = provenanceDto.getResults();
final ProvenanceRequestDTO request = provenanceDto.getRequest();
@@ -3545,6 +3905,252 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
mergeComponentState(componentState, resultsMap);
clientResponse = new NodeResponse(clientResponse, responseEntity);
+ } else if (hasSuccessfulClientResponse && isGroupStatusEndpoint(uri, method)) {
+ final ProcessGroupStatusEntity responseEntity = clientResponse.getClientResponse().getEntity(ProcessGroupStatusEntity.class);
+ final ProcessGroupStatusDTO statusRequest = responseEntity.getProcessGroupStatus();
+
+ NodeIdentifier nodeIdentifier = null;
+
+ final Map<NodeIdentifier, ProcessGroupStatusDTO> resultsMap = new HashMap<>();
+ for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+ if (problematicNodeResponses.contains(nodeResponse)) {
+ continue;
+ }
+
+ final ProcessGroupStatusEntity nodeResponseEntity;
+ if (nodeResponse == clientResponse) {
+ nodeIdentifier = nodeResponse.getNodeId();
+ nodeResponseEntity = responseEntity;
+ } else {
+ nodeResponseEntity = nodeResponse.getClientResponse().getEntity(ProcessGroupStatusEntity.class);
+ }
+
+ final ProcessGroupStatusDTO nodeStatus = nodeResponseEntity.getProcessGroupStatus();
+ resultsMap.put(nodeResponse.getNodeId(), nodeStatus);
+ }
+ mergeGroupStatus(statusRequest, nodeIdentifier, resultsMap);
+
+ clientResponse = new NodeResponse(clientResponse, responseEntity);
+ } else if (hasSuccessfulClientResponse && isProcessorStatusEndpoint(uri, method)) {
+ final ProcessorStatusEntity responseEntity = clientResponse.getClientResponse().getEntity(ProcessorStatusEntity.class);
+ final ProcessorStatusDTO statusRequest = responseEntity.getProcessorStatus();
+
+ NodeIdentifier nodeIdentifier = null;
+
+ final Map<NodeIdentifier, ProcessorStatusDTO> resultsMap = new HashMap<>();
+ for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+ if (problematicNodeResponses.contains(nodeResponse)) {
+ continue;
+ }
+
+ final ProcessorStatusEntity nodeResponseEntity;
+ if (nodeResponse == clientResponse) {
+ nodeIdentifier = nodeResponse.getNodeId();
+ nodeResponseEntity = responseEntity;
+ } else {
+ nodeResponseEntity = nodeResponse.getClientResponse().getEntity(ProcessorStatusEntity.class);
+ }
+
+ final ProcessorStatusDTO nodeStatus = nodeResponseEntity.getProcessorStatus();
+ resultsMap.put(nodeResponse.getNodeId(), nodeStatus);
+ }
+ mergeProcessorStatus(statusRequest, nodeIdentifier, resultsMap);
+
+ clientResponse = new NodeResponse(clientResponse, responseEntity);
+ } else if (hasSuccessfulClientResponse && isConnectionStatusEndpoint(uri, method)) {
+ final ConnectionStatusEntity responseEntity = clientResponse.getClientResponse().getEntity(ConnectionStatusEntity.class);
+ final ConnectionStatusDTO statusRequest = responseEntity.getConnectionStatus();
+
+ NodeIdentifier nodeIdentifier = null;
+
+ final Map<NodeIdentifier, ConnectionStatusDTO> resultsMap = new HashMap<>();
+ for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+ if (problematicNodeResponses.contains(nodeResponse)) {
+ continue;
+ }
+
+ final ConnectionStatusEntity nodeResponseEntity;
+ if (nodeResponse == clientResponse) {
+ nodeIdentifier = nodeResponse.getNodeId();
+ nodeResponseEntity = responseEntity;
+ } else {
+ nodeResponseEntity = nodeResponse.getClientResponse().getEntity(ConnectionStatusEntity.class);
+ }
+
+ final ConnectionStatusDTO nodeStatus = nodeResponseEntity.getConnectionStatus();
+ resultsMap.put(nodeResponse.getNodeId(), nodeStatus);
+ }
+ mergeConnectionStatus(statusRequest, nodeIdentifier, resultsMap);
+
+ clientResponse = new NodeResponse(clientResponse, responseEntity);
+ } else if (hasSuccessfulClientResponse && (isInputPortStatusEndpoint(uri, method) || isOutputPortStatusEndpoint(uri, method))) {
+ final PortStatusEntity responseEntity = clientResponse.getClientResponse().getEntity(PortStatusEntity.class);
+ final PortStatusDTO statusRequest = responseEntity.getPortStatus();
+
+ NodeIdentifier nodeIdentifier = null;
+
+ final Map<NodeIdentifier, PortStatusDTO> resultsMap = new HashMap<>();
+ for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+ if (problematicNodeResponses.contains(nodeResponse)) {
+ continue;
+ }
+
+ final PortStatusEntity nodeResponseEntity;
+ if (nodeResponse == clientResponse) {
+ nodeIdentifier = nodeResponse.getNodeId();
+ nodeResponseEntity = responseEntity;
+ } else {
+ nodeResponseEntity = nodeResponse.getClientResponse().getEntity(PortStatusEntity.class);
+ }
+
+ final PortStatusDTO nodeStatus = nodeResponseEntity.getPortStatus();
+ resultsMap.put(nodeResponse.getNodeId(), nodeStatus);
+ }
+ mergePortStatus(statusRequest, nodeIdentifier, resultsMap);
+
+ clientResponse = new NodeResponse(clientResponse, responseEntity);
+ } else if (hasSuccessfulClientResponse && isRemoteProcessGroupStatusEndpoint(uri, method)) {
+ final RemoteProcessGroupStatusEntity responseEntity = clientResponse.getClientResponse().getEntity(RemoteProcessGroupStatusEntity.class);
+ final RemoteProcessGroupStatusDTO statusRequest = responseEntity.getRemoteProcessGroupStatus();
+
+ NodeIdentifier nodeIdentifier = null;
+
+ final Map<NodeIdentifier, RemoteProcessGroupStatusDTO> resultsMap = new HashMap<>();
+ for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+ if (problematicNodeResponses.contains(nodeResponse)) {
+ continue;
+ }
+
+ final RemoteProcessGroupStatusEntity nodeResponseEntity;
+ if (nodeResponse == clientResponse) {
+ nodeIdentifier = nodeResponse.getNodeId();
+ nodeResponseEntity = responseEntity;
+ } else {
+ nodeResponseEntity = nodeResponse.getClientResponse().getEntity(RemoteProcessGroupStatusEntity.class);
+ }
+
+ final RemoteProcessGroupStatusDTO nodeStatus = nodeResponseEntity.getRemoteProcessGroupStatus();
+ resultsMap.put(nodeResponse.getNodeId(), nodeStatus);
+ }
+ mergeRemoteProcessGroupStatus(statusRequest, nodeIdentifier, resultsMap);
+
+ clientResponse = new NodeResponse(clientResponse, responseEntity);
+ } else if (hasSuccessfulClientResponse && isControllerStatusEndpoint(uri, method)) {
+ final ControllerStatusEntity responseEntity = clientResponse.getClientResponse().getEntity(ControllerStatusEntity.class);
+ final ControllerStatusDTO statusRequest = responseEntity.getControllerStatus();
+
+ final Map<NodeIdentifier, ControllerStatusDTO> resultsMap = new HashMap<>();
+ for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+ if (problematicNodeResponses.contains(nodeResponse)) {
+ continue;
+ }
+
+ final ControllerStatusEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerStatusEntity.class);
+ final ControllerStatusDTO nodeStatus = nodeResponseEntity.getControllerStatus();
+
+ resultsMap.put(nodeResponse.getNodeId(), nodeStatus);
+ }
+ mergeControllerStatus(statusRequest, resultsMap);
+
+ clientResponse = new NodeResponse(clientResponse, responseEntity);
+ } else if (hasSuccessfulClientResponse && isBulletinBoardEndpoint(uri, method)) {
+ final BulletinBoardEntity responseEntity = clientResponse.getClientResponse().getEntity(BulletinBoardEntity.class);
+ final BulletinBoardDTO responseDto = responseEntity.getBulletinBoard();
+
+ final Map<NodeIdentifier, BulletinBoardDTO> resultsMap = new HashMap<>();
+ for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+ if (problematicNodeResponses.contains(nodeResponse)) {
+ continue;
+ }
+
+ final BulletinBoardEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(BulletinBoardEntity.class);
+ final BulletinBoardDTO nodeStatus = nodeResponseEntity.getBulletinBoard();
+
+ resultsMap.put(nodeResponse.getNodeId(), nodeStatus);
+ }
+ mergeBulletinBoard(responseDto, resultsMap);
+
+ clientResponse = new NodeResponse(clientResponse, responseEntity);
+ } else if (hasSuccessfulClientResponse && isProcessorStatusHistoryEndpoint(uri, method)) {
+ final Map<String, MetricDescriptor<?>> metricDescriptors = new HashMap<>();
+ for (final ProcessorStatusDescriptor descriptor : ProcessorStatusDescriptor.values()) {
+ metricDescriptors.put(descriptor.getField(), descriptor.getDescriptor());
+ }
+
+ clientResponse = mergeStatusHistoryResponses(clientResponse, updatedNodesMap, problematicNodeResponses, metricDescriptors);
+ } else if (hasSuccessfulClientResponse && isConnectionStatusHistoryEndpoint(uri, method)) {
+ final Map<String, MetricDescriptor<?>> metricDescriptors = new HashMap<>();
+ for (final ConnectionStatusDescriptor descriptor : ConnectionStatusDescriptor.values()) {
+ metricDescriptors.put(descriptor.getField(), descriptor.getDescriptor());
+ }
+
+ clientResponse = mergeStatusHistoryResponses(clientResponse, updatedNodesMap, problematicNodeResponses, metricDescriptors);
+ } else if (hasSuccessfulClientResponse && isProcessGroupStatusHistoryEndpoint(uri, method)) {
+ final Map<String, MetricDescriptor<?>> metricDescriptors = new HashMap<>();
+ for (final ProcessGroupStatusDescriptor descriptor : ProcessGroupStatusDescriptor.values()) {
+ metricDescriptors.put(descriptor.getField(), descriptor.getDescriptor());
+ }
+
+ clientResponse = mergeStatusHistoryResponses(clientResponse, updatedNodesMap, problematicNodeResponses, metricDescriptors);
+ } else if (hasSuccessfulClientResponse && isRemoteProcessGroupStatusHistoryEndpoint(uri, method)) {
+ final Map<String, MetricDescriptor<?>> metricDescriptors = new HashMap<>();
+ for (final RemoteProcessGroupStatusDescriptor descriptor : RemoteProcessGroupStatusDescriptor.values()) {
+ metricDescriptors.put(descriptor.getField(), descriptor.getDescriptor());
+ }
+
+ clientResponse = mergeStatusHistoryResponses(clientResponse, updatedNodesMap, problematicNodeResponses, metricDescriptors);
+ } else if (hasSuccessfulClientResponse && isSystemDiagnosticsEndpoint(uri, method)) {
+ final SystemDiagnosticsEntity responseEntity = clientResponse.getClientResponse().getEntity(SystemDiagnosticsEntity.class);
+ final SystemDiagnosticsDTO responseDto = responseEntity.getSystemDiagnostics();
+
+ NodeIdentifier nodeIdentifier = null;
+
+ final Map<NodeIdentifier, SystemDiagnosticsDTO> resultsMap = new HashMap<>();
+ for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+ if (problematicNodeResponses.contains(nodeResponse)) {
+ continue;
+ }
+
+ final SystemDiagnosticsEntity nodeResponseEntity;
+ if (nodeResponse == clientResponse) {
+ nodeIdentifier = nodeResponse.getNodeId();
+ nodeResponseEntity = responseEntity;
+ } else {
+ nodeResponseEntity = nodeResponse.getClientResponse().getEntity(SystemDiagnosticsEntity.class);
+ }
+
+ final SystemDiagnosticsDTO nodeStatus = nodeResponseEntity.getSystemDiagnostics();
+ resultsMap.put(nodeResponse.getNodeId(), nodeStatus);
+ }
+ mergeSystemDiagnostics(responseDto, nodeIdentifier, resultsMap);
+
+ clientResponse = new NodeResponse(clientResponse, responseEntity);
+ } else if (hasSuccessfulClientResponse && isCountersEndpoint(uri, method)) {
+ final CountersEntity responseEntity = clientResponse.getClientResponse().getEntity(CountersEntity.class);
+ final CountersDTO responseDto = responseEntity.getCounters();
+
+ NodeIdentifier nodeIdentifier = null;
+
+ final Map<NodeIdentifier, CountersDTO> resultsMap = new HashMap<>();
+ for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+ if (problematicNodeResponses.contains(nodeResponse)) {
+ continue;
+ }
+
+ final CountersEntity nodeResponseEntity;
+ if (nodeResponse == clientResponse) {
+ nodeIdentifier = nodeResponse.getNodeId();
+ nodeResponseEntity = responseEntity;
+ } else {
+ nodeResponseEntity = nodeResponse.getClientResponse().getEntity(CountersEntity.class);
+ }
+
+ final CountersDTO nodeStatus = nodeResponseEntity.getCounters();
+ resultsMap.put(nodeResponse.getNodeId(), nodeStatus);
+ }
+ mergeCounters(responseDto, nodeIdentifier, resultsMap);
+
+ clientResponse = new NodeResponse(clientResponse, responseEntity);
} else {
if (!nodeResponsesToDrain.isEmpty()) {
drainResponses(nodeResponsesToDrain);
@@ -3603,6 +4209,49 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
return clientResponse;
}
+
+ private NodeResponse mergeStatusHistoryResponses(NodeResponse clientResponse, Map<Node, NodeResponse> updatedNodesMap, Set<NodeResponse> problematicNodeResponses,
+ Map<String, MetricDescriptor<?>> metricDescriptors) {
+ final StatusHistoryEntity responseEntity = clientResponse.getClientResponse().getEntity(StatusHistoryEntity.class);
+
+ StatusHistoryDTO lastStatusHistory = null;
+ final List<NodeStatusSnapshotsDTO> nodeStatusSnapshots = new ArrayList<>(updatedNodesMap.size());
+ for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+ if (problematicNodeResponses.contains(nodeResponse)) {
+ continue;
+ }
+
+ final StatusHistoryEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(StatusHistoryEntity.class);
+ final StatusHistoryDTO nodeStatus = nodeResponseEntity.getStatusHistory();
+ lastStatusHistory = nodeStatus;
+
+ final NodeIdentifier nodeId = nodeResponse.getNodeId();
+ final NodeStatusSnapshotsDTO nodeStatusSnapshot = new NodeStatusSnapshotsDTO();
+ nodeStatusSnapshot.setNodeId(nodeId.getId());
+ nodeStatusSnapshot.setAddress(nodeId.getApiAddress());
+ nodeStatusSnapshot.setApiPort(nodeId.getApiPort());
+ nodeStatusSnapshot.setStatusSnapshots(nodeStatus.getAggregateSnapshots());
+ nodeStatusSnapshots.add(nodeStatusSnapshot);
+ }
+
+ final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO();
+ clusterStatusHistory.setAggregateSnapshots(mergeStatusHistories(nodeStatusSnapshots, metricDescriptors));
+ clusterStatusHistory.setGenerated(new Date());
+ clusterStatusHistory.setNodeSnapshots(nodeStatusSnapshots);
+ if (lastStatusHistory != null) {
+ clusterStatusHistory.setComponentDetails(lastStatusHistory.getComponentDetails());
+ clusterStatusHistory.setFieldDescriptors(lastStatusHistory.getFieldDescriptors());
+ }
+
+ final StatusHistoryEntity clusterEntity = new StatusHistoryEntity();
+ clusterEntity.setStatusHistory(clusterStatusHistory);
+ clusterEntity.setRevision(responseEntity.getRevision());
+
+ return new NodeResponse(clientResponse, clusterEntity);
+ }
+
+
+
/**
* Determines if all problematic responses were due to 404 NOT_FOUND. Assumes that problematicNodeResponses is not empty and is not comprised of responses from all nodes in the cluster (at least
* one node contained the counter in question).
@@ -3612,7 +4261,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
* @return Whether all problematic node responses were due to a missing counter
*/
private boolean isMissingCounter(final Set<NodeResponse> problematicNodeResponses, final URI uri) {
- if (isCountersEndpoint(uri)) {
+ if (isCounterEndpoint(uri)) {
boolean notFound = true;
for (final NodeResponse problematicResponse : problematicNodeResponses) {
if (problematicResponse.getStatus() != 404) {
@@ -4026,207 +4675,12 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
return bulletinRepository;
}
- @Override
- public ProcessGroupStatus getProcessGroupStatus(final String groupId) {
- final Set<Node> connectedNodes = getNodes(Node.Status.CONNECTED);
-
- // ensure there are some nodes in the cluster
- if (connectedNodes.isEmpty()) {
- throw new NoConnectedNodesException();
- }
-
- ProcessGroupStatus mergedProcessGroupStatus = null;
- for (final Node node : connectedNodes) {
- final NodeIdentifier nodeId = node.getNodeId();
- final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
- if (nodeHeartbeatPayload == null) {
- continue;
- }
- final ProcessGroupStatus nodeRootProcessGroupStatus = nodeHeartbeatPayload.getProcessGroupStatus();
- final ProcessGroupStatus nodeProcessGroupStatus = groupId.equals(ROOT_GROUP_ID_ALIAS) ? nodeRootProcessGroupStatus : getProcessGroupStatus(nodeRootProcessGroupStatus, groupId);
- if (nodeProcessGroupStatus == null) {
- continue;
- }
-
- if (mergedProcessGroupStatus == null) {
- mergedProcessGroupStatus = nodeProcessGroupStatus.clone();
-
- // update any issues with the node label
- if (mergedProcessGroupStatus.getRemoteProcessGroupStatus() != null) {
- for (final RemoteProcessGroupStatus remoteProcessGroupStatus : mergedProcessGroupStatus.getRemoteProcessGroupStatus()) {
- final List<String> nodeAuthorizationIssues = remoteProcessGroupStatus.getAuthorizationIssues();
- if (!nodeAuthorizationIssues.isEmpty()) {
- for (final ListIterator<String> iter = nodeAuthorizationIssues.listIterator(); iter.hasNext();) {
- final String Issue = iter.next();
- iter.set("[" + nodeId.getApiAddress() + ":" + nodeId.getApiPort() + "] -- " + Issue);
- }
- remoteProcessGroupStatus.setAuthorizationIssues(nodeAuthorizationIssues);
- }
- }
- }
- } else {
- final ProcessGroupStatus nodeClone = nodeProcessGroupStatus.clone();
- for (final RemoteProcessGroupStatus remoteProcessGroupStatus : nodeClone.getRemoteProcessGroupStatus()) {
- final List<String> nodeAuthorizationIssues = remoteProcessGroupStatus.getAuthorizationIssues();
- if (!nodeAuthorizationIssues.isEmpty()) {
- for (final ListIterator<String> iter = nodeAuthorizationIssues.listIterator(); iter.hasNext();) {
- final String Issue = iter.next();
- iter.set("[" + nodeId.getApiAddress() + ":" + nodeId.getApiPort() + "] -- " + Issue);
- }
- remoteProcessGroupStatus.setAuthorizationIssues(nodeAuthorizationIssues);
- }
- }
-
- ProcessGroupStatus.merge(mergedProcessGroupStatus, nodeClone);
- }
- }
-
- return mergedProcessGroupStatus;
- }
-
- private ProcessGroupStatus getProcessGroupStatus(final ProcessGroupStatus parent, final String groupId) {
- if (parent.getId().equals(groupId)) {
- return parent;
- }
-
- for (final ProcessGroupStatus child : parent.getProcessGroupStatus()) {
- final ProcessGroupStatus matching = getProcessGroupStatus(child, groupId);
- if (matching != null) {
- return matching;
- }
- }
-
- return null;
- }
-
- @Override
- public SystemDiagnostics getSystemDiagnostics() {
- final Set<Node> connectedNodes = getNodes(Node.Status.CONNECTED);
-
- // ensure there are some nodes...
- if (connectedNodes.isEmpty()) {
- throw new NoConnectedNodesException();
- }
-
- SystemDiagnostics clusterDiagnostics = null;
- for (final Node node : connectedNodes) {
- final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
- if (nodeHeartbeatPayload == null) {
- continue;
- }
- final SystemDiagnostics nodeDiagnostics = nodeHeartbeatPayload.getSystemDiagnostics();
- if (nodeDiagnostics == null) {
- continue;
- }
-
- if (clusterDiagnostics == null) {
- clusterDiagnostics = nodeDiagnostics.clone();
- } else {
- merge(clusterDiagnostics, nodeDiagnostics);
- }
- }
-
- return clusterDiagnostics;
- }
-
- private void merge(final SystemDiagnostics target, final SystemDiagnostics sd) {
-
- // threads
- target.setDaemonThreads(target.getDaemonThreads() + sd.getDaemonThreads());
- target.setTotalThreads(target.getTotalThreads() + sd.getTotalThreads());
-
- // heap
- target.setTotalHeap(target.getTotalHeap() + sd.getTotalHeap());
- target.setUsedHeap(target.getUsedHeap() + sd.getUsedHeap());
- target.setMaxHeap(target.getMaxHeap() + sd.getMaxHeap());
-
- // non heap
- target.setTotalNonHeap(target.getTotalNonHeap() + sd.getTotalNonHeap());
- target.setUsedNonHeap(target.getUsedNonHeap() + sd.getUsedNonHeap());
- target.setMaxNonHeap(target.getMaxNonHeap() + sd.getMaxNonHeap());
-
- // processors
- target.setAvailableProcessors(target.getAvailableProcessors() + sd.getAvailableProcessors());
-
- // load
- if (sd.getProcessorLoadAverage() != null) {
- if (target.getProcessorLoadAverage() != null) {
- target.setProcessorLoadAverage(target.getProcessorLoadAverage() + sd.getProcessorLoadAverage());
- } else {
- target.setProcessorLoadAverage(sd.getProcessorLoadAverage());
- }
- }
-
- // db disk usage
- merge(target.getFlowFileRepositoryStorageUsage(), sd.getFlowFileRepositoryStorageUsage());
-
- // repo disk usage
- final Map<String, StorageUsage> targetContentRepoMap;
- if (target.getContentRepositoryStorageUsage() == null) {
- targetContentRepoMap = new LinkedHashMap<>();
- target.setContentRepositoryStorageUsage(targetContentRepoMap);
- } else {
- targetContentRepoMap = target.getContentRepositoryStorageUsage();
- }
- if (sd.getContentRepositoryStorageUsage() != null) {
- for (final Map.Entry<String, StorageUsage> sdEntry : sd.getContentRepositoryStorageUsage().entrySet()) {
- final StorageUsage mergedDiskUsage = targetContentRepoMap.get(sdEntry.getKey());
- if (mergedDiskUsage == null) {
- targetContentRepoMap.put(sdEntry.getKey(), sdEntry.getValue());
- } else {
- merge(mergedDiskUsage, sdEntry.getValue());
- }
- }
- }
-
- // garbage collection
- final Map<String, GarbageCollection> targetGarbageCollection;
- if (target.getGarbageCollection() == null) {
- targetGarbageCollection = new LinkedHashMap<>();
- target.setGarbageCollection(targetGarbageCollection);
- } else {
- targetGarbageCollection = target.getGarbageCollection();
- }
- if (sd.getGarbageCollection() != null) {
- for (final Map.Entry<String, GarbageCollection> gcEntry : sd.getGarbageCollection().entrySet()) {
- final GarbageCollection mergedGarbageCollection = targetGarbageCollection.get(gcEntry.getKey());
- if (mergedGarbageCollection == null) {
- targetGarbageCollection.put(gcEntry.getKey(), gcEntry.getValue().clone());
- } else {
- merge(mergedGarbageCollection, gcEntry.getValue());
- }
- }
- }
- }
-
- private void merge(final StorageUsage target, final StorageUsage du) {
- target.setFreeSpace(target.getFreeSpace() + du.getFreeSpace());
- target.setTotalSpace(target.getTotalSpace() + du.getTotalSpace());
- }
-
- private void merge(final GarbageCollection target, final GarbageCollection gc) {
- target.setCollectionCount(target.getCollectionCount() + gc.getCollectionCount());
- target.setCollectionTime(target.getCollectionTime() + gc.getCollectionTime());
- }
public static Date normalizeStatusSnapshotDate(final Date toNormalize, final long numMillis) {
final long time = toNormalize.getTime();
return new Date(time - time % numMillis);
}
- private NodeDTO createNodeDTO(final Node node) {
- final NodeDTO nodeDto = new NodeDTO();
- final NodeIdentifier nodeId = node.getNodeId();
- nodeDto.setNodeId(nodeId.getId());
- nodeDto.setAddress(nodeId.getApiAddress());
- nodeDto.setApiPort(nodeId.getApiPort());
- nodeDto.setStatus(node.getStatus().name());
- nodeDto.setPrimary(node.equals(getPrimaryNode()));
- final Date connectionRequested = new Date(node.getConnectionRequestedTimestamp());
- nodeDto.setConnectionRequested(connectionRequested);
-
- return nodeDto;
- }
private List<StatusSnapshotDTO> aggregate(Map<Date, List<StatusSnapshot>> snapshotsToAggregate) {
// Aggregate the snapshots
@@ -4245,278 +4699,65 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
return aggregatedSnapshotDtos;
}
- public ClusterStatusHistoryDTO getProcessorStatusHistory(final String processorId) {
- return getProcessorStatusHistory(processorId, null, null, Integer.MAX_VALUE);
- }
-
- public ClusterStatusHistoryDTO getProcessorStatusHistory(final String processorId, final Date startDate, final Date endDate, final int preferredDataPoints) {
- final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
- StatusHistoryDTO lastStatusHistory = null;
- final Set<MetricDescriptor<?>> processorDescriptors = new LinkedHashSet<>();
- final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
+ private StatusSnapshot createSnapshot(final StatusSnapshotDTO snapshotDto, final Map<String, MetricDescriptor<?>> metricDescriptors) {
+ final StandardStatusSnapshot snapshot = new StandardStatusSnapshot();
+ snapshot.setTimestamp(snapshotDto.getTimestamp());
- for (final Node node : getRawNodes()) {
- final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
- if (statusRepository == null) {
- continue;
- }
+ final Map<String, Long> metrics = snapshotDto.getStatusMetrics();
+ for (final Map.Entry<String, Long> entry : metrics.entrySet()) {
+ final String metricId = entry.getKey();
+ final Long value = entry.getValue();
- final StatusHistory statusHistory = statusRepository.getProcessorStatusHistory(processorId, startDate, endDate, preferredDataPoints);
- if (statusHistory == null) {
- continue;
- }
-
- processorDescriptors.addAll(statusRepository.getProcessorMetricDescriptors());
-
- // record the status history (last) to get the component details for use later
- final StatusHistoryDTO statusHistoryDto = createStatusHistoryDto(statusHistory);
- lastStatusHistory = statusHistoryDto;
-
- final NodeStatusHistoryDTO nodeHistory = new NodeStatusHistoryDTO();
- nodeHistory.setStatusHistory(statusHistoryDto);
- nodeHistory.setNode(createNodeDTO(node));
- nodeHistories.add(nodeHistory);
-
- // collect all of the snapshots to aggregate
- for (final StatusSnapshot snapshot : statusHistory.getStatusSnapshots()) {
- final Date normalizedDate = normalizeStatusSnapshotDate(snapshot.getTimestamp(), componentStatusSnapshotMillis);
- List<StatusSnapshot> snapshots = snapshotsToAggregate.get(normalizedDate);
- if (snapshots == null) {
- snapshots = new ArrayList<>();
- snapshotsToAggregate.put(normalizedDate, snapshots);
- }
- snapshots.add(snapshot);
+ final MetricDescriptor<?> descriptor = metricDescriptors.get(metricId);
+ if (descriptor != null) {
+ snapshot.addStatusMetric(descriptor, value);
}
}
- // Aggregate the snapshots
- final List<StatusSnapshotDTO> aggregatedSnapshotDtos = aggregate(snapshotsToAggregate);
-
- // get the details for this component from the last status history
- final LinkedHashMap<String, String> clusterStatusHistoryDetails = new LinkedHashMap<>();
- clusterStatusHistoryDetails.putAll(lastStatusHistory.getDetails());
-
- final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO();
- clusterStatusHistory.setGenerated(new Date());
- clusterStatusHistory.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(processorDescriptors));
- clusterStatusHistory.setDetails(clusterStatusHistoryDetails);
- clusterStatusHistory.setStatusSnapshots(aggregatedSnapshotDtos);
-
- final ClusterStatusHistoryDTO history = new ClusterStatusHistoryDTO();
- history.setGenerated(new Date());
- history.setNodeStatusHistory(nodeHistories);
- history.setClusterStatusHistory(clusterStatusHistory);
- return history;
+ return snapshot;
}
- public StatusHistoryDTO createStatusHistoryDto(final StatusHistory statusHistory) {
- final StatusHistoryDTO dto = new StatusHistoryDTO();
-
- dto.setDetails(new LinkedHashMap<>(statusHistory.getComponentDetails()));
- dto.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(statusHistory));
- dto.setGenerated(statusHistory.getDateGenerated());
+ private List<StatusSnapshotDTO> mergeStatusHistories(final List<NodeStatusSnapshotsDTO> nodeStatusSnapshots, final Map<String, MetricDescriptor<?>> metricDescriptors) {
+ // We want a Map<Date, List<StatusSnapshot>>, which is a Map of "normalized Date" (i.e., a time range, essentially)
+ // to all Snapshots for that time. The list will contain one snapshot for each node. However, we can have the case
+ // where the NCM has a different value for the componentStatusSnapshotMillis than the nodes have. In this case,
+ // we end up with multiple entries in the List<StatusSnapshot> for the same node/timestamp, which skews our aggregate
+ // results. In order to avoid this, we will use only the latest snapshot for a node that falls into the the time range
+ // of interest.
+ // To accomplish this, we have an intermediate data structure, which is a Map of "normalized Date" to an inner Map
+ // of Node Identifier to StatusSnapshot. We then will flatten this Map and aggregate the results.
+ final Map<Date, Map<String, StatusSnapshot>> dateToNodeSnapshots = new TreeMap<>();
- final List<StatusSnapshotDTO> statusSnapshots = new ArrayList<>();
- for (final StatusSnapshot statusSnapshot : statusHistory.getStatusSnapshots()) {
- statusSnapshots.add(StatusHistoryUtil.createStatusSnapshotDto(statusSnapshot));
- }
- dto.setStatusSnapshots(statusSnapshots);
-
- return dto;
- }
-
- public ClusterStatusHistoryDTO getConnectionStatusHistory(final String connectionId) {
- return getConnectionStatusHistory(connectionId, null, null, Integer.MAX_VALUE);
- }
-
- public ClusterStatusHistoryDTO getConnectionStatusHistory(final String connectionId, final Date startDate, final Date endDate, final int preferredDataPoints) {
- final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
-
- StatusHistoryDTO lastStatusHistory = null;
- final Set<MetricDescriptor<?>> connectionDescriptors = new LinkedHashSet<>();
- final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
-
- for (final Node node : getRawNodes()) {
- final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
- if (statusRepository == null) {
- continue;
- }
-
- final StatusHistory statusHistory = statusRepository.getConnectionStatusHistory(connectionId, startDate, endDate, preferredDataPoints);
- if (statusHistory == null) {
- continue;
- }
-
- final StatusHistoryDTO statusHistoryDto = createStatusHistoryDto(statusHistory);
- // record the status history (last) to get the componet details for use later
- lastStatusHistory = statusHistoryDto;
- connectionDescriptors.addAll(statusRepository.getConnectionMetricDescriptors());
-
- final NodeStatusHistoryDTO nodeHistory = new NodeStatusHistoryDTO();
- nodeHistory.setStatusHistory(statusHistoryDto);
- nodeHistory.setNode(createNodeDTO(node));
- nodeHistories.add(nodeHistory);
-
- // collect all of the snapshots to aggregate
- for (final StatusSnapshot snapshot : statusHistory.getStatusSnapshots()) {
+ // group status snapshot's for each node by date
+ for (final NodeStatusSnapshotsDTO nodeStatusSnapshot : nodeStatusSnapshots) {
+ for (final StatusSnapshotDTO snapshotDto : nodeStatusSnapshot.getStatusSnapshots()) {
+ final StatusSnapshot snapshot = createSnapshot(snapshotDto, metricDescriptors);
final Date normalizedDate = normalizeStatusSnapshotDate(snapshot.getTimestamp(), componentStatusSnapshotMillis);
- List<StatusSnapshot> snapshots = snapshotsToAggregate.get(normalizedDate);
- if (snapshots == null) {
- snapshots = new ArrayList<>();
- snapshotsToAggregate.put(normalizedDate, snapshots);
- }
- snapshots.add(snapshot);
- }
- }
-
- // Aggregate the snapshots
- final List<StatusSnapshotDTO> aggregatedSnapshotDtos = aggregate(snapshotsToAggregate);
-
- // get the details for this component from the last status history
- final LinkedHashMap<String, String> clusterStatusHistoryDetails = new LinkedHashMap<>();
- clusterStatusHistoryDetails.putAll(lastStatusHistory.getDetails());
-
- final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO();
- clusterStatusHistory.setGenerated(new Date());
- clusterStatusHistory.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(connectionDescriptors));
- clusterStatusHistory.setDetails(clusterStatusHistoryDetails);
- clusterStatusHistory.setStatusSnapshots(aggregatedSnapshotDtos);
-
- final ClusterStatusHistoryDTO history = new ClusterStatusHistoryDTO();
- history.setGenerated(new Date());
- history.setNodeStatusHistory(nodeHistories);
- history.setClusterStatusHistory(clusterStatusHistory);
- return history;
- }
-
- public ClusterStatusHistoryDTO getProcessGroupStatusHistory(final String processGroupId) {
- return getProcessGroupStatusHistory(processGroupId, null, null, Integer.MAX_VALUE);
- }
-
- public ClusterStatusHistoryDTO getProcessGroupStatusHistory(final String processGroupId, final Date startDate, final Date endDate, final int preferredDataPoints) {
- final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
- StatusHistoryDTO lastStatusHistory = null;
- final Set<MetricDescriptor<?>> processGroupDescriptors = new LinkedHashSet<>();
- final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
-
- for (final Node node : getRawNodes()) {
- final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
- if (statusRepository == null) {
- continue;
- }
-
- final StatusHistory statusHistory = statusRepository.getProcessGroupStatusHistory(processGroupId, startDate, endDate, preferredDataPoints);
- if (statusHistory == null) {
- continue;
- }
-
- final StatusHistoryDTO statusHistoryDto = createStatusHistoryDto(statusHistory);
- // record the status history (last) to get the componet details for use later
- lastStatusHistory = statusHistoryDto;
- processGroupDescriptors.addAll(statusRepository.getProcessGroupMetricDescriptors());
-
- final NodeStatusHistoryDTO nodeHistory = new NodeStatusHistoryDTO();
- nodeHistory.setStatusHistory(statusHistoryDto);
- nodeHistory.setNode(createNodeDTO(node));
- nodeHistories.add(nodeHistory);
-
- // collect all of the snapshots to aggregate
- for (final StatusSnapshot snapshot : statusHistory.getStatusSnapshots()) {
- final Date normalizedDate = normalizeStatusSnapshotDate(snapshot.getTimestamp(), componentStatusSnapshotMillis);
- List<StatusSnapshot> snapshots = snapshotsToAggregate.get(normalizedDate);
- if (snapshots == null) {
- snapshots = new ArrayList<>();
- snapshotsToAggregate.put(normalizedDate, snapshots);
+ Map<String, StatusSnapshot> nodeToSnapshotMap = dateToNodeSnapshots.get(normalizedDate);
+ if (nodeToSnapshotMap == null) {
+ nodeToSnapshotMap = new HashMap<>();
+ dateToNodeSnapshots.put(normalizedDate, nodeToSnapshotMap);
}
- snapshots.add(snapshot);
+ nodeToSnapshotMap.put(nodeStatusSnapshot.getNodeId(), snapshot);
}
}
- // Aggregate the snapshots
- final List<StatusSnapshotDTO> aggregatedSnapshotDtos = aggregate(snapshotsToAggregate);
-
- // get the details for this component from the last status history
- final LinkedHashMap<String, String> clusterStatusHistoryDetails = new LinkedHashMap<>();
- clusterStatusHistoryDetails.putAll(lastStatusHistory.getDetails());
-
- final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO();
- clusterStatusHistory.setGenerated(new Date());
- clusterStatusHistory.setDetails(clusterStatusHistoryDetails);
- clusterStatusHistory.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(processGroupDescriptors));
- clusterStatusHistory.setStatusSnapshots(aggregatedSnapshotDtos);
-
- final ClusterStatusHistoryDTO history = new ClusterStatusHistoryDTO();
- history.setGenerated(new Date());
- history.setNodeStatusHistory(nodeHistories);
- history.setClusterStatusHistory(clusterStatusHistory);
- return history;
- }
-
- public ClusterStatusHistoryDTO getRemoteProcessGroupStatusHistory(final String remoteGroupId) {
- return getRemoteProcessGroupStatusHistory(remoteGroupId, null, null, Integer.MAX_VALUE);
- }
-
- public ClusterStatusHistoryDTO getRemoteProcessGroupStatusHistory(final String remoteGroupId, final Date startDate, final Date endDate, final int preferredDataPoints) {
- final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
-
- StatusHistoryDTO lastStatusHistory = null;
- final Set<MetricDescriptor<?>> remoteProcessGroupDescriptors = new LinkedHashSet<>();
+ // aggregate the snapshots by (normalized) timestamp
final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
-
- for (final Node node : getRawNodes()) {
- final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
- if (statusRepository == null) {
- continue;
- }
-
- final StatusHistory statusHistory = statusRepository.getRemoteProcessGroupStatusHistory(remoteGroupId, startDate, endDate, preferredDataPoints);
- if (statusHistory == null) {
- continue;
- }
-
- final StatusHistoryDTO statusHistoryDto = createStatusHistoryDto(statusHistory);
- // record the status history (last) to get the componet details for use later
- lastStatusHistory = statusHistoryDto;
- remoteProcessGroupDescriptors.addAll(statusRepository.getRemoteProcessGroupMetricDescriptors());
-
- final NodeStatusHistoryDTO nodeHistory = new NodeStatusHistoryDTO();
- nodeHistory.setStatusHistory(statusHistoryDto);
- nodeHistory.setNode(createNodeDTO(node));
- nodeHistories.add(nodeHistory);
-
- // collect all of the snapshots to aggregate
- for (final StatusSnapshot snapshot : statusHistory.getStatusSnapshots()) {
- final Date normalizedDate = normalizeStatusSnapshotDate(snapshot.getTimestamp(), componentStatusSnapshotMillis);
- List<StatusSnapshot> snapshots = snapshotsToAggregate.get(normalizedDate);
- if (snapshots == null) {
- snapshots = new ArrayList<>();
- snapshotsToAggregate.put(normalizedDate, snapshots);
- }
- snapshots.add(snapshot);
- }
+ for (final Map.Entry<Date, Map<String, StatusSnapshot>> entry : dateToNodeSnapshots.entrySet()) {
+ final Date normalizedDate = entry.getKey();
+ final Map<String, StatusSnapshot> nodeToSnapshot = entry.getValue();
+ final List<StatusSnapshot> snapshotsForTimestamp = new ArrayList<>(nodeToSnapshot.values());
+ snapshotsToAggregate.put(normalizedDate, snapshotsForTimestamp);
}
- // Aggregate the snapshots
- final List<StatusSnapshotDTO> aggregatedSnapshotDtos = aggregate(snapshotsToAggregate);
+ final List<StatusSnapshotDTO> aggregatedSnapshots = aggregate(snapshotsToAggregate);
+ return aggregatedSnapshots;
+ }
- // get the details for this comp
<TRUNCATED>