You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2017/07/21 18:18:57 UTC
[1/2] nifi git commit: NIFI-106: - Expose processors' counters in
Stats History - Only include counters in Processors' Status History if user
has read access to corresponding Processor - Addressed review feedback. Found
and addressed bug where a counter
Repository: nifi
Updated Branches:
refs/heads/master c54b2ad81 -> 695e8aa98
http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/RingBufferEventRepositoryBean.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/RingBufferEventRepositoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/RingBufferEventRepositoryBean.java
index f861c7a..c253a2c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/RingBufferEventRepositoryBean.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/RingBufferEventRepositoryBean.java
@@ -16,8 +16,7 @@
*/
package org.apache.nifi.spring;
-import org.apache.nifi.controller.repository.RingBufferEventRepository;
-
+import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository;
import org.springframework.beans.factory.FactoryBean;
public class RingBufferEventRepositoryBean implements FactoryBean<RingBufferEventRepository> {
http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java
index a160b9e..cb5c306 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java
@@ -16,10 +16,12 @@
*/
package org.apache.nifi.controller.repository;
-import org.apache.nifi.controller.repository.RingBufferEventRepository;
import org.apache.nifi.controller.repository.StandardRepositoryStatusReport;
+import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository;
import org.apache.nifi.controller.repository.FlowFileEvent;
import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
@@ -133,6 +135,11 @@ public class TestRingBufferEventRepository {
public long getBytesSent() {
return 0;
}
+
+ @Override
+ public Map<String, Long> getCounters() {
+ return Collections.emptyMap();
+ }
};
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index 420022f..66a5073 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -289,10 +289,12 @@ public class ControllerFacade implements Authorizable {
throw new ResourceNotFoundException(String.format("Unable to locate processor with id '%s'.", processorId));
}
- final StatusHistoryDTO statusHistory = flowController.getProcessorStatusHistory(processorId);
+ final boolean authorized = processor.isAuthorized(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
+
+ final StatusHistoryDTO statusHistory = flowController.getProcessorStatusHistory(processorId, authorized);
// if not authorized
- if (!processor.isAuthorized(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser())) {
+ if (!authorized) {
statusHistory.getComponentDetails().put(ComponentStatusRepository.COMPONENT_DETAIL_NAME, processorId);
statusHistory.getComponentDetails().put(ComponentStatusRepository.COMPONENT_DETAIL_TYPE, "Processor");
}
[2/2] nifi git commit: NIFI-106: - Expose processors' counters in
Stats History - Only include counters in Processors' Status History if user
has read access to corresponding Processor - Addressed review feedback. Found
and addressed bug where a counter
Posted by mc...@apache.org.
NIFI-106:
- Expose processors' counters in Stats History
- Only include counters in Processors' Status History if user has read access to corresponding Processor
- Addressed review feedback. Found and addressed bug where a counter is not present in all of the aggregate snaphot values for status history, resulting in the UI not rendering the chart properly
- This closes #1872
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/695e8aa9
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/695e8aa9
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/695e8aa9
Branch: refs/heads/master
Commit: 695e8aa98f1d9cce5a9b3025193ac57f9acd598e
Parents: c54b2ad
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue May 30 14:40:30 2017 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Fri Jul 21 14:18:11 2017 -0400
----------------------------------------------------------------------
.../nifi/controller/status/ProcessorStatus.java | 14 +
.../history/ComponentStatusRepository.java | 4 +-
.../status/history/StatusHistory.java | 1 +
.../web/api/dto/status/StatusDescriptorDTO.java | 22 ++
.../endpoints/StatusHistoryEndpointMerger.java | 87 +++++-
.../controller/repository/FlowFileEvent.java | 5 +
.../apache/nifi/controller/FlowController.java | 12 +-
.../repository/RingBufferEventRepository.java | 312 -------------------
.../repository/StandardFlowFileEvent.java | 237 --------------
.../repository/StandardProcessSession.java | 74 ++++-
.../repository/metrics/EventContainer.java | 28 ++
.../controller/repository/metrics/EventSum.java | 53 ++++
.../repository/metrics/EventSumValue.java | 207 ++++++++++++
.../metrics/RingBufferEventRepository.java | 67 ++++
.../metrics/SecondPrecisionEventContainer.java | 69 ++++
.../metrics/StandardFlowFileEvent.java | 205 ++++++++++++
.../scheduling/EventDrivenSchedulingAgent.java | 2 +-
.../status/history/StatusHistoryUtil.java | 17 +-
.../VolatileComponentStatusRepository.java | 19 +-
.../tasks/ContinuallyRunProcessorTask.java | 2 +-
.../spring/RingBufferEventRepositoryBean.java | 3 +-
.../TestRingBufferEventRepository.java | 9 +-
.../nifi/web/controller/ControllerFacade.java | 6 +-
23 files changed, 870 insertions(+), 585 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java
index 54be7ba..5a4f64a 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java
@@ -16,6 +16,8 @@
*/
package org.apache.nifi.controller.status;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
@@ -42,6 +44,7 @@ public class ProcessorStatus implements Cloneable {
private long bytesReceived;
private int flowFilesSent;
private long bytesSent;
+ private Map<String, Long> counters;
public String getId() {
return id;
@@ -211,6 +214,14 @@ public class ProcessorStatus implements Cloneable {
this.bytesSent = bytesSent;
}
+ public Map<String, Long> getCounters() {
+ return counters;
+ }
+
+ public void setCounters(final Map<String, Long> counters) {
+ this.counters = counters;
+ }
+
@Override
public ProcessorStatus clone() {
final ProcessorStatus clonedObj = new ProcessorStatus();
@@ -234,6 +245,7 @@ public class ProcessorStatus implements Cloneable {
clonedObj.flowFilesRemoved = flowFilesRemoved;
clonedObj.runStatus = runStatus;
clonedObj.type = type;
+ clonedObj.counters = counters == null ? null : new HashMap<>(counters);
return clonedObj;
}
@@ -268,6 +280,8 @@ public class ProcessorStatus implements Cloneable {
builder.append(processingNanos);
builder.append(", activeThreadCount=");
builder.append(activeThreadCount);
+ builder.append(", counters=");
+ builder.append(counters);
builder.append("]");
return builder.toString();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusRepository.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusRepository.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusRepository.java
index 1042c3f..553903f 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusRepository.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusRepository.java
@@ -103,10 +103,12 @@ public interface ComponentStatusRepository {
* If the date range is large, the total number of data points could be far
* too many to process. Therefore, this parameter allows the requestor to
* indicate how many samples to return.
+ * @param includeCounters specifies whether or not metrics from Processor counters
+ * should be included in the StatusHistory.
* @return a {@link StatusHistory} that provides the status information
* about the Processor with the given ID during the given time period
*/
- StatusHistory getProcessorStatusHistory(String processorId, Date start, Date end, int preferredDataPoints);
+ StatusHistory getProcessorStatusHistory(String processorId, Date start, Date end, int preferredDataPoints, boolean includeCounters);
/**
* @param remoteGroupId to get history of
http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistory.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistory.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistory.java
index f1bb946..75c609d 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistory.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistory.java
@@ -41,4 +41,5 @@ public interface StatusHistory {
* @return List of snapshots for a given component
*/
List<StatusSnapshot> getStatusSnapshots();
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusDescriptorDTO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusDescriptorDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusDescriptorDTO.java
index 0a3c418..34ee315 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusDescriptorDTO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusDescriptorDTO.java
@@ -17,6 +17,9 @@
package org.apache.nifi.web.api.dto.status;
import com.wordnik.swagger.annotations.ApiModelProperty;
+
+import java.util.Objects;
+
import javax.xml.bind.annotation.XmlType;
/**
@@ -102,4 +105,23 @@ public class StatusDescriptorDTO {
this.formatter = formatter;
}
+ @Override
+ public int hashCode() {
+ return 31 + 41 * (field == null ? 0 : field.hashCode());
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof StatusDescriptorDTO)) {
+ return false;
+ }
+ final StatusDescriptorDTO other = (StatusDescriptorDTO) obj;
+ return Objects.equals(field, other.field);
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
index 60b40f7..eb96adb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
@@ -16,32 +16,39 @@
*/
package org.apache.nifi.cluster.coordination.http.endpoints;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.regex.Pattern;
import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
import org.apache.nifi.controller.status.history.MetricDescriptor;
+import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor;
import org.apache.nifi.controller.status.history.ProcessorStatusDescriptor;
import org.apache.nifi.controller.status.history.RemoteProcessGroupStatusDescriptor;
+import org.apache.nifi.controller.status.history.StandardMetricDescriptor;
import org.apache.nifi.controller.status.history.StandardStatusSnapshot;
import org.apache.nifi.controller.status.history.StatusHistoryUtil;
import org.apache.nifi.controller.status.history.StatusSnapshot;
+import org.apache.nifi.controller.status.history.ValueMapper;
import org.apache.nifi.web.api.dto.status.NodeStatusSnapshotsDTO;
+import org.apache.nifi.web.api.dto.status.StatusDescriptorDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
import org.apache.nifi.web.api.entity.StatusHistoryEntity;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.regex.Pattern;
+
public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
public static final Pattern PROCESSOR_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/flow/processors/[a-f0-9\\-]{36}/status/history");
@@ -55,7 +62,7 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
this.componentStatusSnapshotMillis = componentStatusSnapshotMillis;
}
- private Map<String, MetricDescriptor<?>> getMetricDescriptors(final URI uri) {
+ private Map<String, MetricDescriptor<?>> getStandardMetricDescriptors(final URI uri) {
final String path = uri.getPath();
final Map<String, MetricDescriptor<?>> metricDescriptors = new HashMap<>();
@@ -87,16 +94,19 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
return false;
}
- final Map<String, MetricDescriptor<?>> descriptors = getMetricDescriptors(uri);
+ final Map<String, MetricDescriptor<?>> descriptors = getStandardMetricDescriptors(uri);
return descriptors != null && !descriptors.isEmpty();
}
@Override
public NodeResponse merge(URI uri, String method, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses, NodeResponse clientResponse) {
- final Map<String, MetricDescriptor<?>> metricDescriptors = getMetricDescriptors(uri);
+ final Map<String, MetricDescriptor<?>> metricDescriptors = getStandardMetricDescriptors(uri);
final StatusHistoryEntity responseEntity = clientResponse.getClientResponse().getEntity(StatusHistoryEntity.class);
+ final Set<StatusDescriptorDTO> fieldDescriptors = new LinkedHashSet<>();
+
+ boolean includeCounters = true;
StatusHistoryDTO lastStatusHistory = null;
final List<NodeStatusSnapshotsDTO> nodeStatusSnapshots = new ArrayList<>(successfulResponses.size());
LinkedHashMap<String, String> noReadPermissionsComponentDetails = null;
@@ -109,6 +119,10 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
noReadPermissionsComponentDetails = nodeStatus.getComponentDetails();
}
+ if (!Boolean.TRUE.equals(nodeResponseEntity.getCanRead())) {
+ includeCounters = false;
+ }
+
final NodeIdentifier nodeId = nodeResponse.getNodeId();
final NodeStatusSnapshotsDTO nodeStatusSnapshot = new NodeStatusSnapshotsDTO();
nodeStatusSnapshot.setNodeId(nodeId.getId());
@@ -116,6 +130,38 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
nodeStatusSnapshot.setApiPort(nodeId.getApiPort());
nodeStatusSnapshot.setStatusSnapshots(nodeStatus.getAggregateSnapshots());
nodeStatusSnapshots.add(nodeStatusSnapshot);
+
+ final List<StatusDescriptorDTO> descriptors = nodeStatus.getFieldDescriptors();
+ if (descriptors != null) {
+ fieldDescriptors.addAll(descriptors);
+ }
+ }
+
+ // If there's a status descriptor that is in the fieldDescriptors, but is not in the standard metric descriptors that we find,
+ // then it is a counter metric and should be included only if all StatusHistoryDTO's include counter metrics. This is done because
+ // we include counters in the status history only if the user is authorized to read the Processor. Since it's possible for the nodes
+ // to disagree about who is authorized (if, for example, the authorizer is asynchronously updated), then if any node indicates that
+ // the user is not authorized, we want to assume that the user is, in fact, not authorized.
+ if (includeCounters) {
+ for (final StatusDescriptorDTO descriptorDto : fieldDescriptors) {
+ final String fieldName = descriptorDto.getField();
+
+ if (!metricDescriptors.containsKey(fieldName)) {
+ final ValueMapper<ProcessorStatus> valueMapper = s -> {
+ final Map<String, Long> counters = s.getCounters();
+ if (counters == null) {
+ return 0L;
+ }
+
+ return counters.getOrDefault(descriptorDto.getField(), 0L);
+ };
+
+ final MetricDescriptor<ProcessorStatus> metricDescriptor = new StandardMetricDescriptor<>(descriptorDto.getField(),
+ descriptorDto.getLabel(), descriptorDto.getDescription(), Formatter.COUNT, valueMapper);
+
+ metricDescriptors.put(fieldName, metricDescriptor);
+ }
+ }
}
final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO();
@@ -124,8 +170,8 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
clusterStatusHistory.setNodeSnapshots(nodeStatusSnapshots);
if (lastStatusHistory != null) {
clusterStatusHistory.setComponentDetails(noReadPermissionsComponentDetails == null ? lastStatusHistory.getComponentDetails() : noReadPermissionsComponentDetails);
- clusterStatusHistory.setFieldDescriptors(lastStatusHistory.getFieldDescriptors());
}
+ clusterStatusHistory.setFieldDescriptors(new ArrayList<>(fieldDescriptors));
final StatusHistoryEntity clusterEntity = new StatusHistoryEntity();
clusterEntity.setStatusHistory(clusterStatusHistory);
@@ -177,6 +223,19 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
final StandardStatusSnapshot snapshot = new StandardStatusSnapshot();
snapshot.setTimestamp(snapshotDto.getTimestamp());
+ // Default all metrics to 0 so that if a counter has not yet been registered, it will have a value of 0 instead
+ // of missing all together.
+ for (final MetricDescriptor<?> descriptor : metricDescriptors.values()) {
+ snapshot.addStatusMetric(descriptor, 0L);
+
+ // If the DTO doesn't have an entry for the metric, add with a value of 0.
+ final Map<String, Long> dtoMetrics = snapshotDto.getStatusMetrics();
+ final String field = descriptor.getField();
+ if (!dtoMetrics.containsKey(field)) {
+ dtoMetrics.put(field, 0L);
+ }
+ }
+
final Map<String, Long> metrics = snapshotDto.getStatusMetrics();
for (final Map.Entry<String, Long> entry : metrics.entrySet()) {
final String metricId = entry.getKey();
http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java
index f07a530..26cea50 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java
@@ -16,6 +16,8 @@
*/
package org.apache.nifi.controller.repository;
+import java.util.Map;
+
public interface FlowFileEvent {
String getComponentIdentifier();
@@ -51,4 +53,7 @@ public interface FlowFileEvent {
long getBytesSent();
int getInvocations();
+
+ Map<String, Long> getCounters();
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 9438946..8da894f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -2943,6 +2943,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
status.setFlowFilesSent(entry.getFlowFilesSent());
status.setBytesSent(entry.getBytesSent());
status.setFlowFilesRemoved(entry.getFlowFilesRemoved());
+
+ if (isProcessorAuthorized) {
+ status.setCounters(entry.getCounters());
+ }
}
// Determine the run status and get any validation error... only validating while STOPPED
@@ -4482,12 +4486,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return StatusHistoryUtil.createStatusHistoryDTO(componentStatusRepository.getConnectionStatusHistory(connectionId, startTime, endTime, preferredDataPoints));
}
- public StatusHistoryDTO getProcessorStatusHistory(final String processorId) {
- return getProcessorStatusHistory(processorId, null, null, Integer.MAX_VALUE);
+ public StatusHistoryDTO getProcessorStatusHistory(final String processorId, final boolean includeCounters) {
+ return getProcessorStatusHistory(processorId, null, null, Integer.MAX_VALUE, includeCounters);
}
- public StatusHistoryDTO getProcessorStatusHistory(final String processorId, final Date startTime, final Date endTime, final int preferredDataPoints) {
- return StatusHistoryUtil.createStatusHistoryDTO(componentStatusRepository.getProcessorStatusHistory(processorId, startTime, endTime, preferredDataPoints));
+ public StatusHistoryDTO getProcessorStatusHistory(final String processorId, final Date startTime, final Date endTime, final int preferredDataPoints, final boolean includeCounters) {
+ return StatusHistoryUtil.createStatusHistoryDTO(componentStatusRepository.getProcessorStatusHistory(processorId, startTime, endTime, preferredDataPoints, includeCounters));
}
public StatusHistoryDTO getProcessGroupStatusHistory(final String processGroupId) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RingBufferEventRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RingBufferEventRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RingBufferEventRepository.java
deleted file mode 100644
index fb3cdd2..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RingBufferEventRepository.java
+++ /dev/null
@@ -1,312 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class RingBufferEventRepository implements FlowFileEventRepository {
-
- private final int numMinutes;
- private final ConcurrentMap<String, EventContainer> componentEventMap = new ConcurrentHashMap<>();
-
- public RingBufferEventRepository(final int numMinutes) {
- this.numMinutes = numMinutes;
- }
-
- @Override
- public void close() throws IOException {
- }
-
- @Override
- public void updateRepository(final FlowFileEvent event) {
- final String componentId = event.getComponentIdentifier();
- EventContainer eventContainer = componentEventMap.get(componentId);
- if (eventContainer == null) {
- eventContainer = new SecondPrecisionEventContainer(numMinutes);
- final EventContainer oldEventContainer = componentEventMap.putIfAbsent(componentId, eventContainer);
- if (oldEventContainer != null) {
- eventContainer = oldEventContainer;
- }
- }
-
- eventContainer.addEvent(event);
- }
-
- @Override
- public StandardRepositoryStatusReport reportTransferEvents(final long sinceEpochMillis) {
- final StandardRepositoryStatusReport report = new StandardRepositoryStatusReport();
-
- for (final Map.Entry<String, EventContainer> entry : componentEventMap.entrySet()) {
- final String consumerId = entry.getKey();
- final EventContainer container = entry.getValue();
-
- final FlowFileEvent reportEntry = container.generateReport(consumerId, sinceEpochMillis);
- report.addReportEntry(reportEntry);
- }
-
- return report;
- }
-
- @Override
- public void purgeTransferEvents(final long cutoffEpochMilliseconds) {
- // This is done so that if a processor is removed from the graph, its events
- // will be removed rather than being kept in memory
- for (final EventContainer container : componentEventMap.values()) {
- container.purgeEvents(cutoffEpochMilliseconds);
- }
- }
-
- private static interface EventContainer {
-
- public void addEvent(FlowFileEvent event);
-
- public void purgeEvents(long cutoffEpochMillis);
-
- public FlowFileEvent generateReport(String consumerId, long sinceEpochMillis);
- }
-
- private class EventSum {
-
- private final AtomicReference<EventSumValue> ref = new AtomicReference<>(new EventSumValue());
-
- private void add(final FlowFileEvent event) {
- EventSumValue newValue;
- EventSumValue value;
- do {
- value = ref.get();
- newValue = new EventSumValue(value,
- event.getFlowFilesIn(), event.getFlowFilesOut(), event.getFlowFilesRemoved(),
- event.getContentSizeIn(), event.getContentSizeOut(), event.getContentSizeRemoved(),
- event.getBytesRead(), event.getBytesWritten(),
- event.getFlowFilesReceived(), event.getBytesReceived(),
- event.getFlowFilesSent(), event.getBytesSent(),
- event.getProcessingNanoseconds(), event.getInvocations(), event.getAggregateLineageMillis());
- } while (!ref.compareAndSet(value, newValue));
- }
-
- public EventSumValue getValue() {
- return ref.get();
- }
-
- public void addOrReset(final FlowFileEvent event) {
- final long expectedMinute = System.currentTimeMillis() / 60000;
-
- final EventSumValue curValue = ref.get();
- if (curValue.getMinuteTimestamp() != expectedMinute) {
- ref.compareAndSet(curValue, new EventSumValue());
- }
- add(event);
- }
- }
-
- private static class EventSumValue {
-
- private final int flowFilesIn, flowFilesOut, flowFilesRemoved;
- private final long contentSizeIn, contentSizeOut, contentSizeRemoved;
- private final long bytesRead, bytesWritten;
- private final int flowFilesReceived, flowFilesSent;
- private final long bytesReceived, bytesSent;
- private final long processingNanos;
- private final long aggregateLineageMillis;
- private final int invocations;
-
- private final long minuteTimestamp;
- private final long millisecondTimestamp;
-
- public EventSumValue() {
- flowFilesIn = flowFilesOut = flowFilesRemoved = 0;
- contentSizeIn = contentSizeOut = contentSizeRemoved = 0;
- bytesRead = bytesWritten = 0;
- flowFilesReceived = flowFilesSent = 0;
- bytesReceived = bytesSent = 0L;
- processingNanos = invocations = 0;
- aggregateLineageMillis = 0L;
- this.millisecondTimestamp = System.currentTimeMillis();
- this.minuteTimestamp = millisecondTimestamp / 60000;
- }
-
- public EventSumValue(final EventSumValue base, final int flowFilesIn, final int flowFilesOut, final int flowFilesRemoved,
- final long contentSizeIn, final long contentSizeOut, final long contentSizeRemoved,
- final long bytesRead, final long bytesWritten,
- final int flowFilesReceived, final long bytesReceived,
- final int flowFilesSent, final long bytesSent,
- final long processingNanos, final int invocations, final long aggregateLineageMillis) {
- this.flowFilesIn = base.flowFilesIn + flowFilesIn;
- this.flowFilesOut = base.flowFilesOut + flowFilesOut;
- this.flowFilesRemoved = base.flowFilesRemoved + flowFilesRemoved;
- this.contentSizeIn = base.contentSizeIn + contentSizeIn;
- this.contentSizeOut = base.contentSizeOut + contentSizeOut;
- this.contentSizeRemoved = base.contentSizeRemoved + contentSizeRemoved;
- this.bytesRead = base.bytesRead + bytesRead;
- this.bytesWritten = base.bytesWritten + bytesWritten;
- this.flowFilesReceived = base.flowFilesReceived + flowFilesReceived;
- this.bytesReceived = base.bytesReceived + bytesReceived;
- this.flowFilesSent = base.flowFilesSent + flowFilesSent;
- this.bytesSent = base.bytesSent + bytesSent;
- this.processingNanos = base.processingNanos + processingNanos;
- this.invocations = base.invocations + invocations;
- this.aggregateLineageMillis = base.aggregateLineageMillis + aggregateLineageMillis;
- this.millisecondTimestamp = System.currentTimeMillis();
- this.minuteTimestamp = millisecondTimestamp / 60000;
- }
-
- public long getTimestamp() {
- return millisecondTimestamp;
- }
-
- public long getMinuteTimestamp() {
- return minuteTimestamp;
- }
-
- public long getBytesRead() {
- return bytesRead;
- }
-
- public long getBytesWritten() {
- return bytesWritten;
- }
-
- public int getFlowFilesIn() {
- return flowFilesIn;
- }
-
- public int getFlowFilesOut() {
- return flowFilesOut;
- }
-
- public long getContentSizeIn() {
- return contentSizeIn;
- }
-
- public long getContentSizeOut() {
- return contentSizeOut;
- }
-
- public int getFlowFilesRemoved() {
- return flowFilesRemoved;
- }
-
- public long getContentSizeRemoved() {
- return contentSizeRemoved;
- }
-
- public long getProcessingNanoseconds() {
- return processingNanos;
- }
-
- public int getInvocations() {
- return invocations;
- }
-
- public long getAggregateLineageMillis() {
- return aggregateLineageMillis;
- }
-
- public int getFlowFilesReceived() {
- return flowFilesReceived;
- }
-
- public int getFlowFilesSent() {
- return flowFilesSent;
- }
-
- public long getBytesReceived() {
- return bytesReceived;
- }
-
- public long getBytesSent() {
- return bytesSent;
- }
- }
-
- private class SecondPrecisionEventContainer implements EventContainer {
-
- private final int numBins;
- private final EventSum[] sums;
-
- public SecondPrecisionEventContainer(final int numMinutes) {
- numBins = 1 + numMinutes * 60;
- sums = new EventSum[numBins];
-
- for (int i = 0; i < numBins; i++) {
- sums[i] = new EventSum();
- }
- }
-
- @Override
- public void addEvent(final FlowFileEvent event) {
- final int second = (int) (System.currentTimeMillis() / 1000);
- final int binIdx = (int) (second % numBins);
- final EventSum sum = sums[binIdx];
-
- sum.addOrReset(event);
- }
-
- @Override
- public void purgeEvents(final long cutoffEpochMilliseconds) {
- // no need to do anything
- }
-
- @Override
- public FlowFileEvent generateReport(final String consumerId, final long sinceEpochMillis) {
- int flowFilesIn = 0, flowFilesOut = 0, flowFilesRemoved = 0;
- long contentSizeIn = 0L, contentSizeOut = 0L, contentSizeRemoved = 0L;
- long bytesRead = 0L, bytesWritten = 0L;
- int invocations = 0;
- long processingNanos = 0L;
- long aggregateLineageMillis = 0L;
- int flowFilesReceived = 0, flowFilesSent = 0;
- long bytesReceived = 0L, bytesSent = 0L;
-
- final long second = sinceEpochMillis / 1000;
- final int startBinIdx = (int) (second % numBins);
-
- for (int i = 0; i < numBins; i++) {
- int binIdx = (startBinIdx + i) % numBins;
- final EventSum sum = sums[binIdx];
-
- final EventSumValue sumValue = sum.getValue();
- if (sumValue.getTimestamp() >= sinceEpochMillis) {
- flowFilesIn += sumValue.getFlowFilesIn();
- flowFilesOut += sumValue.getFlowFilesOut();
- flowFilesRemoved += sumValue.getFlowFilesRemoved();
- contentSizeIn += sumValue.getContentSizeIn();
- contentSizeOut += sumValue.getContentSizeOut();
- contentSizeRemoved += sumValue.getContentSizeRemoved();
- bytesRead += sumValue.getBytesRead();
- bytesWritten += sumValue.getBytesWritten();
- flowFilesReceived += sumValue.getFlowFilesReceived();
- bytesReceived += sumValue.getBytesReceived();
- flowFilesSent += sumValue.getFlowFilesSent();
- bytesSent += sumValue.getBytesSent();
- invocations += sumValue.getInvocations();
- processingNanos += sumValue.getProcessingNanoseconds();
- aggregateLineageMillis += sumValue.getAggregateLineageMillis();
- }
- }
-
- return new StandardFlowFileEvent(consumerId, flowFilesIn, contentSizeIn,
- flowFilesOut, contentSizeOut, flowFilesRemoved, contentSizeRemoved,
- bytesRead, bytesWritten, flowFilesReceived, bytesReceived, flowFilesSent, bytesSent,
- invocations, aggregateLineageMillis, processingNanos);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileEvent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileEvent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileEvent.java
deleted file mode 100644
index d584735..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileEvent.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository;
-
-public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable {
-
- private final String componentId;
-
- private int flowFilesIn;
- private int flowFilesOut;
- private int flowFilesRemoved;
- private long contentSizeIn;
- private long contentSizeOut;
- private long contentSizeRemoved;
- private long bytesRead;
- private long bytesWritten;
- private long processingNanos;
- private long aggregateLineageMillis;
- private int flowFilesReceived;
- private long bytesReceived;
- private int flowFilesSent;
- private long bytesSent;
- private int invocations;
-
- public StandardFlowFileEvent(final String componentId) {
- this.componentId = componentId;
- }
-
- public StandardFlowFileEvent(final String componentId,
- final int flowFilesIn, final long contentSizeIn,
- final int flowFilesOut, final long contentSizeOut,
- final int flowFilesRemoved, final long contentSizeRemoved,
- final long bytesRead, final long bytesWritten,
- final int flowFilesReceived, final long bytesReceived,
- final int flowFilesSent, final long bytesSent,
- final int invocations, final long averageLineageMillis, final long processingNanos) {
- this.componentId = componentId;
- this.flowFilesIn = flowFilesIn;
- this.contentSizeIn = contentSizeIn;
- this.flowFilesOut = flowFilesOut;
- this.contentSizeOut = contentSizeOut;
- this.flowFilesRemoved = flowFilesRemoved;
- this.contentSizeRemoved = contentSizeRemoved;
- this.bytesRead = bytesRead;
- this.bytesWritten = bytesWritten;
- this.invocations = invocations;
- this.flowFilesReceived = flowFilesReceived;
- this.bytesReceived = bytesReceived;
- this.flowFilesSent = flowFilesSent;
- this.bytesSent = bytesSent;
- this.aggregateLineageMillis = averageLineageMillis;
- this.processingNanos = processingNanos;
- }
-
- public StandardFlowFileEvent(final FlowFileEvent other) {
- this.componentId = other.getComponentIdentifier();
- this.flowFilesIn = other.getFlowFilesIn();
- this.contentSizeIn = other.getContentSizeIn();
- this.flowFilesOut = other.getFlowFilesOut();
- this.contentSizeOut = other.getContentSizeOut();
- this.flowFilesRemoved = other.getFlowFilesRemoved();
- this.contentSizeRemoved = other.getContentSizeRemoved();
- this.bytesRead = other.getBytesRead();
- this.bytesWritten = other.getBytesWritten();
- this.invocations = other.getInvocations();
- this.flowFilesReceived = other.getFlowFilesReceived();
- this.bytesReceived = other.getBytesReceived();
- this.flowFilesSent = other.getFlowFilesSent();
- this.bytesSent = other.getBytesSent();
- this.aggregateLineageMillis = other.getAggregateLineageMillis();
- this.processingNanos = other.getProcessingNanoseconds();
- }
-
- @Override
- public String getComponentIdentifier() {
- return componentId;
- }
-
- @Override
- public int getFlowFilesIn() {
- return flowFilesIn;
- }
-
- public void setFlowFilesIn(int flowFilesIn) {
- this.flowFilesIn = flowFilesIn;
- }
-
- @Override
- public int getFlowFilesOut() {
- return flowFilesOut;
- }
-
- public void setFlowFilesOut(int flowFilesOut) {
- this.flowFilesOut = flowFilesOut;
- }
-
- @Override
- public long getContentSizeIn() {
- return contentSizeIn;
- }
-
- public void setContentSizeIn(long contentSizeIn) {
- this.contentSizeIn = contentSizeIn;
- }
-
- @Override
- public long getContentSizeOut() {
- return contentSizeOut;
- }
-
- public void setContentSizeOut(long contentSizeOut) {
- this.contentSizeOut = contentSizeOut;
- }
-
- @Override
- public long getContentSizeRemoved() {
- return contentSizeRemoved;
- }
-
- public void setContentSizeRemoved(final long contentSizeRemoved) {
- this.contentSizeRemoved = contentSizeRemoved;
- }
-
- @Override
- public int getFlowFilesRemoved() {
- return flowFilesRemoved;
- }
-
- public void setFlowFilesRemoved(final int flowFilesRemoved) {
- this.flowFilesRemoved = flowFilesRemoved;
- }
-
- @Override
- public long getBytesRead() {
- return bytesRead;
- }
-
- public void setBytesRead(long bytesRead) {
- this.bytesRead = bytesRead;
- }
-
- @Override
- public long getBytesWritten() {
- return bytesWritten;
- }
-
- public void setBytesWritten(long bytesWritten) {
- this.bytesWritten = bytesWritten;
- }
-
- @Override
- public long getProcessingNanoseconds() {
- return processingNanos;
- }
-
- public void setProcessingNanos(final long processingNanos) {
- this.processingNanos = processingNanos;
- }
-
- @Override
- public int getInvocations() {
- return invocations;
- }
-
- public void setInvocations(final int invocations) {
- this.invocations = invocations;
- }
-
- @Override
- public int getFlowFilesReceived() {
- return flowFilesReceived;
- }
-
- public void setFlowFilesReceived(int flowFilesReceived) {
- this.flowFilesReceived = flowFilesReceived;
- }
-
- @Override
- public long getBytesReceived() {
- return bytesReceived;
- }
-
- public void setBytesReceived(long bytesReceived) {
- this.bytesReceived = bytesReceived;
- }
-
- @Override
- public int getFlowFilesSent() {
- return flowFilesSent;
- }
-
- public void setFlowFilesSent(int flowFilesSent) {
- this.flowFilesSent = flowFilesSent;
- }
-
- @Override
- public long getBytesSent() {
- return bytesSent;
- }
-
- public void setBytesSent(long bytesSent) {
- this.bytesSent = bytesSent;
- }
-
- @Override
- public long getAverageLineageMillis() {
- if (flowFilesOut == 0 && flowFilesRemoved == 0) {
- return 0L;
- }
-
- return aggregateLineageMillis / (flowFilesOut + flowFilesRemoved);
- }
-
- public void setAggregateLineageMillis(long lineageMilliseconds) {
- this.aggregateLineageMillis = lineageMilliseconds;
- }
-
- @Override
- public long getAggregateLineageMillis() {
- return aggregateLineageMillis;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index d2a6af6..6393014 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -57,6 +57,7 @@ import org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream;
import org.apache.nifi.controller.repository.io.FlowFileAccessInputStream;
import org.apache.nifi.controller.repository.io.FlowFileAccessOutputStream;
import org.apache.nifi.controller.repository.io.LimitedInputStream;
+import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.FlowFileFilter;
@@ -108,7 +109,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private final Map<FlowFileRecord, StandardRepositoryRecord> records = new HashMap<>();
private final Map<String, StandardFlowFileEvent> connectionCounts = new HashMap<>();
private final Map<FlowFileQueue, Set<FlowFileRecord>> unacknowledgedFlowFiles = new HashMap<>();
- private final Map<String, Long> counters = new HashMap<>();
private final Map<ContentClaim, ByteCountingOutputStream> appendableStreams = new HashMap<>();
private final ProcessContext context;
private final Map<FlowFile, Integer> readRecursionSet = new HashMap<>();// set used to track what is currently being operated on to prevent logic failures if recursive calls occurring
@@ -117,6 +117,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private final long sessionId;
private final String connectableDescription;
+ private Map<String, Long> countersOnCommit;
+ private Map<String, Long> immediateCounters;
+
private final Set<String> removedFlowFiles = new HashSet<>();
private final Set<String> createdFlowFiles = new HashSet<>();
@@ -438,8 +441,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
}
- for (final Map.Entry<String, Long> entry : checkpoint.counters.entrySet()) {
- adjustCounter(entry.getKey(), entry.getValue(), true);
+ for (final Map.Entry<String, Long> entry : checkpoint.countersOnCommit.entrySet()) {
+ context.adjustCounter(entry.getKey(), entry.getValue());
}
acknowledgeRecords();
@@ -533,6 +536,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
flowFileEvent.setAggregateLineageMillis(lineageMillis);
+ final Map<String, Long> counters = combineCounters(checkpoint.countersOnCommit, checkpoint.immediateCounters);
+ flowFileEvent.setCounters(counters);
+
context.getFlowFileEventRepository().updateRepository(flowFileEvent);
for (final FlowFileEvent connectionEvent : checkpoint.connectionCounts.values()) {
@@ -543,6 +549,23 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
}
+ private Map<String, Long> combineCounters(final Map<String, Long> first, final Map<String, Long> second) {
+ if (first == null && second == null) {
+ return null;
+ }
+ if (first == null) {
+ return second;
+ }
+ if (second == null) {
+ return first;
+ }
+
+ final Map<String, Long> combined = new HashMap<>();
+ combined.putAll(first);
+ combined.putAll(second);
+ return combined;
+ }
+
private void addEventType(final Map<String, Set<ProvenanceEventType>> map, final String id, final ProvenanceEventType eventType) {
Set<ProvenanceEventType> eventTypes = map.get(id);
if (eventTypes == null) {
@@ -1013,6 +1036,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent(connectable.getIdentifier());
flowFileEvent.setBytesRead(bytesRead);
flowFileEvent.setBytesWritten(bytesWritten);
+ flowFileEvent.setCounters(immediateCounters);
// update event repository
try {
@@ -1106,7 +1130,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
connectionCounts.clear();
createdFlowFiles.clear();
removedFlowFiles.clear();
- counters.clear();
+ if (countersOnCommit != null) {
+ countersOnCommit.clear();
+ }
+ if (immediateCounters != null) {
+ immediateCounters.clear();
+ }
generatedProvenanceEvents.clear();
forkEventBuilders.clear();
@@ -1441,12 +1470,24 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
@Override
public void adjustCounter(final String name, final long delta, final boolean immediate) {
+ final Map<String, Long> counters;
if (immediate) {
- context.adjustCounter(name, delta);
- return;
+ if (immediateCounters == null) {
+ immediateCounters = new HashMap<>();
+ }
+ counters = immediateCounters;
+ } else {
+ if (countersOnCommit == null) {
+ countersOnCommit = new HashMap<>();
+ }
+ counters = countersOnCommit;
}
adjustCounter(name, delta, counters);
+
+ if (immediate) {
+ context.adjustCounter(name, delta);
+ }
}
private void adjustCounter(final String name, final long delta, final Map<String, Long> map) {
@@ -3216,7 +3257,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private final Map<FlowFileRecord, StandardRepositoryRecord> records = new HashMap<>();
private final Map<String, StandardFlowFileEvent> connectionCounts = new HashMap<>();
private final Map<FlowFileQueue, Set<FlowFileRecord>> unacknowledgedFlowFiles = new HashMap<>();
- private final Map<String, Long> counters = new HashMap<>();
+
+ private Map<String, Long> countersOnCommit = new HashMap<>();
+ private Map<String, Long> immediateCounters = new HashMap<>();
private final Map<FlowFile, Path> deleteOnCommit = new HashMap<>();
private final Set<String> removedFlowFiles = new HashSet<>();
@@ -3242,7 +3285,22 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
this.records.putAll(session.records);
this.connectionCounts.putAll(session.connectionCounts);
this.unacknowledgedFlowFiles.putAll(session.unacknowledgedFlowFiles);
- this.counters.putAll(session.counters);
+
+ if (session.countersOnCommit != null) {
+ if (this.countersOnCommit == null) {
+ this.countersOnCommit = new HashMap<>();
+ }
+
+ this.countersOnCommit.putAll(session.countersOnCommit);
+ }
+
+ if (session.immediateCounters != null) {
+ if (this.immediateCounters == null) {
+ this.immediateCounters = new HashMap<>();
+ }
+
+ this.immediateCounters.putAll(session.immediateCounters);
+ }
this.deleteOnCommit.putAll(session.deleteOnCommit);
this.removedFlowFiles.addAll(session.removedFlowFiles);
http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventContainer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventContainer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventContainer.java
new file mode 100644
index 0000000..9dd3c8e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventContainer.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.metrics;
+
+import org.apache.nifi.controller.repository.FlowFileEvent;
+
+public interface EventContainer {
+ public void addEvent(FlowFileEvent event);
+
+ public void purgeEvents(long cutoffEpochMillis);
+
+ public FlowFileEvent generateReport(String componentId, long sinceEpochMillis);
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSum.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSum.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSum.java
new file mode 100644
index 0000000..b1c9120
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSum.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.metrics;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.nifi.controller.repository.FlowFileEvent;
+
+public class EventSum {
+
+ private final AtomicReference<EventSumValue> ref = new AtomicReference<>();
+
+ public EventSumValue getValue() {
+ final EventSumValue value = ref.get();
+ return value == null ? new EventSumValue() : value;
+ }
+
+ public void addOrReset(final FlowFileEvent event) {
+ final long expectedMinute = System.currentTimeMillis() / 60000;
+
+ EventSumValue curValue;
+ while (true) {
+ curValue = ref.get();
+ if (curValue == null || curValue.getMinuteTimestamp() != expectedMinute) {
+ final EventSumValue newValue = new EventSumValue();
+ final boolean replaced = ref.compareAndSet(curValue, newValue);
+ if (replaced) {
+ curValue = newValue;
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+
+ curValue.add(event);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java
new file mode 100644
index 0000000..3306e2b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.metrics;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.controller.repository.FlowFileEvent;
+
+public class EventSumValue {
+
+ private final AtomicInteger flowFilesIn = new AtomicInteger(0);
+ private final AtomicInteger flowFilesOut = new AtomicInteger(0);
+ private final AtomicInteger flowFilesRemoved = new AtomicInteger(0);
+ private final AtomicInteger flowFilesReceived = new AtomicInteger(0);
+ private final AtomicInteger flowFilesSent = new AtomicInteger(0);
+
+ private final AtomicLong contentSizeIn = new AtomicLong(0L);
+ private final AtomicLong contentSizeOut = new AtomicLong(0L);
+ private final AtomicLong contentSizeRemoved = new AtomicLong(0L);
+ private final AtomicLong bytesRead = new AtomicLong(0L);
+ private final AtomicLong bytesWritten = new AtomicLong(0L);
+
+ private final AtomicLong bytesReceived = new AtomicLong(0L);
+ private final AtomicLong bytesSent = new AtomicLong(0L);
+ private final AtomicLong processingNanos = new AtomicLong(0L);
+ private final AtomicLong aggregateLineageMillis = new AtomicLong(0L);
+ private final AtomicInteger invocations = new AtomicInteger(0);
+ private final ConcurrentMap<String, Long> counters = new ConcurrentHashMap<>();
+
+ private final long minuteTimestamp;
+ private final long millisecondTimestamp;
+
+
+ public EventSumValue() {
+ this.millisecondTimestamp = System.currentTimeMillis();
+ this.minuteTimestamp = millisecondTimestamp / 60000;
+ }
+
+ public void add(final FlowFileEvent flowFileEvent) {
+ this.aggregateLineageMillis.addAndGet(flowFileEvent.getAggregateLineageMillis());
+ this.bytesRead.addAndGet(flowFileEvent.getBytesRead());
+ this.bytesReceived.addAndGet(flowFileEvent.getBytesReceived());
+ this.bytesSent.addAndGet(flowFileEvent.getBytesSent());
+ this.bytesWritten.addAndGet(flowFileEvent.getBytesWritten());
+ this.contentSizeIn.addAndGet(flowFileEvent.getContentSizeIn());
+ this.contentSizeOut.addAndGet(flowFileEvent.getContentSizeOut());
+ this.contentSizeRemoved.addAndGet(flowFileEvent.getContentSizeRemoved());
+ this.flowFilesIn.addAndGet(flowFileEvent.getFlowFilesIn());
+ this.flowFilesOut.addAndGet(flowFileEvent.getFlowFilesOut());
+ this.flowFilesReceived.addAndGet(flowFileEvent.getFlowFilesReceived());
+ this.flowFilesRemoved.addAndGet(flowFileEvent.getFlowFilesRemoved());
+ this.flowFilesSent.addAndGet(flowFileEvent.getFlowFilesSent());
+ this.invocations.addAndGet(flowFileEvent.getInvocations());
+ this.processingNanos.addAndGet(flowFileEvent.getProcessingNanoseconds());
+
+ final Map<String, Long> eventCounters = flowFileEvent.getCounters();
+ if (eventCounters != null) {
+ for (final Map.Entry<String, Long> entry : eventCounters.entrySet()) {
+ final String counterName = entry.getKey();
+ final Long counterValue = entry.getValue();
+
+ counters.compute(counterName, (key, value) -> value == null ? counterValue : value + counterValue);
+ }
+ }
+ }
+
+ public FlowFileEvent toFlowFileEvent(final String componentId) {
+ final StandardFlowFileEvent event = new StandardFlowFileEvent(componentId);
+ event.setAggregateLineageMillis(getAggregateLineageMillis());
+ event.setBytesRead(getBytesRead());
+ event.setBytesReceived(getBytesReceived());
+ event.setBytesSent(getBytesSent());
+ event.setBytesWritten(getBytesWritten());
+ event.setContentSizeIn(getContentSizeIn());
+ event.setContentSizeOut(getContentSizeOut());
+ event.setContentSizeRemoved(getContentSizeRemoved());
+ event.setFlowFilesIn(getFlowFilesIn());
+ event.setFlowFilesOut(getFlowFilesOut());
+ event.setFlowFilesReceived(getFlowFilesReceived());
+ event.setFlowFilesRemoved(getFlowFilesRemoved());
+ event.setFlowFilesSent(getFlowFilesSent());
+ event.setInvocations(getInvocations());
+ event.setProcessingNanos(getProcessingNanoseconds());
+ event.setCounters(Collections.unmodifiableMap(this.counters));
+ return event;
+ }
+
+ public void add(final EventSumValue other) {
+ this.aggregateLineageMillis.addAndGet(other.getAggregateLineageMillis());
+ this.bytesRead.addAndGet(other.getBytesRead());
+ this.bytesReceived.addAndGet(other.getBytesReceived());
+ this.bytesSent.addAndGet(other.getBytesSent());
+ this.bytesWritten.addAndGet(other.getBytesWritten());
+ this.contentSizeIn.addAndGet(other.getContentSizeIn());
+ this.contentSizeOut.addAndGet(other.getContentSizeOut());
+ this.contentSizeRemoved.addAndGet(other.getContentSizeRemoved());
+ this.flowFilesIn.addAndGet(other.getFlowFilesIn());
+ this.flowFilesOut.addAndGet(other.getFlowFilesOut());
+ this.flowFilesReceived.addAndGet(other.getFlowFilesReceived());
+ this.flowFilesRemoved.addAndGet(other.getFlowFilesRemoved());
+ this.flowFilesSent.addAndGet(other.getFlowFilesSent());
+ this.invocations.addAndGet(other.getInvocations());
+ this.processingNanos.addAndGet(other.getProcessingNanoseconds());
+
+ final Map<String, Long> eventCounters = other.getCounters();
+ if (eventCounters != null) {
+ for (final Map.Entry<String, Long> entry : eventCounters.entrySet()) {
+ final String counterName = entry.getKey();
+ final Long counterValue = entry.getValue();
+
+ counters.compute(counterName, (key, value) -> value == null ? counterValue : value + counterValue);
+ }
+ }
+ }
+
+ public long getTimestamp() {
+ return millisecondTimestamp;
+ }
+
+ public long getMinuteTimestamp() {
+ return minuteTimestamp;
+ }
+
+ public long getBytesRead() {
+ return bytesRead.get();
+ }
+
+ public long getBytesWritten() {
+ return bytesWritten.get();
+ }
+
+ public int getFlowFilesIn() {
+ return flowFilesIn.get();
+ }
+
+ public int getFlowFilesOut() {
+ return flowFilesOut.get();
+ }
+
+ public long getContentSizeIn() {
+ return contentSizeIn.get();
+ }
+
+ public long getContentSizeOut() {
+ return contentSizeOut.get();
+ }
+
+ public int getFlowFilesRemoved() {
+ return flowFilesRemoved.get();
+ }
+
+ public long getContentSizeRemoved() {
+ return contentSizeRemoved.get();
+ }
+
+ public long getProcessingNanoseconds() {
+ return processingNanos.get();
+ }
+
+ public int getInvocations() {
+ return invocations.get();
+ }
+
+ public long getAggregateLineageMillis() {
+ return aggregateLineageMillis.get();
+ }
+
+ public int getFlowFilesReceived() {
+ return flowFilesReceived.get();
+ }
+
+ public int getFlowFilesSent() {
+ return flowFilesSent.get();
+ }
+
+ public long getBytesReceived() {
+ return bytesReceived.get();
+ }
+
+ public long getBytesSent() {
+ return bytesSent.get();
+ }
+
+ public Map<String, Long> getCounters() {
+ return counters;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java
new file mode 100644
index 0000000..b9a82ed
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository.metrics;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.nifi.controller.repository.FlowFileEvent;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.repository.StandardRepositoryStatusReport;
+
+public class RingBufferEventRepository implements FlowFileEventRepository {
+
+ private final int numMinutes;
+ private final ConcurrentMap<String, EventContainer> componentEventMap = new ConcurrentHashMap<>();
+
+ public RingBufferEventRepository(final int numMinutes) {
+ this.numMinutes = numMinutes;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void updateRepository(final FlowFileEvent event) {
+ final String componentId = event.getComponentIdentifier();
+ final EventContainer eventContainer = componentEventMap.computeIfAbsent(componentId, id -> new SecondPrecisionEventContainer(numMinutes));
+ eventContainer.addEvent(event);
+ }
+
+ @Override
+ public StandardRepositoryStatusReport reportTransferEvents(final long sinceEpochMillis) {
+ final StandardRepositoryStatusReport report = new StandardRepositoryStatusReport();
+
+ componentEventMap.entrySet().stream()
+ .map(entry -> entry.getValue().generateReport(entry.getKey(), sinceEpochMillis))
+ .forEach(event -> report.addReportEntry(event));
+
+ return report;
+ }
+
+ @Override
+ public void purgeTransferEvents(final long cutoffEpochMilliseconds) {
+ // This is done so that if a processor is removed from the graph, its events
+ // will be removed rather than being kept in memory
+ for (final EventContainer container : componentEventMap.values()) {
+ container.purgeEvents(cutoffEpochMilliseconds);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/SecondPrecisionEventContainer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/SecondPrecisionEventContainer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/SecondPrecisionEventContainer.java
new file mode 100644
index 0000000..72a8cfc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/SecondPrecisionEventContainer.java
@@ -0,0 +1,69 @@
+/*
+e * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.metrics;
+
+import org.apache.nifi.controller.repository.FlowFileEvent;
+
+public class SecondPrecisionEventContainer implements EventContainer {
+ private final int numBins;
+ private final EventSum[] sums;
+
+ public SecondPrecisionEventContainer(final int numMinutes) {
+ numBins = 1 + numMinutes * 60;
+ sums = new EventSum[numBins];
+
+ for (int i = 0; i < numBins; i++) {
+ sums[i] = new EventSum();
+ }
+ }
+
+ @Override
+ public void addEvent(final FlowFileEvent event) {
+ final int second = (int) (System.currentTimeMillis() / 1000);
+ final int binIdx = second % numBins;
+ final EventSum sum = sums[binIdx];
+
+ sum.addOrReset(event);
+ }
+
+ @Override
+ public void purgeEvents(final long cutoffEpochMilliseconds) {
+ // no need to do anything
+ }
+
+ @Override
+ public FlowFileEvent generateReport(final String componentId, final long sinceEpochMillis) {
+ final EventSumValue eventSumValue = new EventSumValue();
+ final long second = sinceEpochMillis / 1000;
+ final int startBinIdx = (int) (second % numBins);
+
+ for (int i = 0; i < numBins; i++) {
+ int binIdx = (startBinIdx + i) % numBins;
+ final EventSum sum = sums[binIdx];
+
+ final EventSumValue sumValue = sum.getValue();
+ if (sumValue.getTimestamp() >= sinceEpochMillis) {
+ eventSumValue.add(sumValue);
+ }
+ }
+
+ final FlowFileEvent flowFileEvent = eventSumValue.toFlowFileEvent(componentId);
+ return flowFileEvent;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java
new file mode 100644
index 0000000..40ec983
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository.metrics;
+
+import java.util.Map;
+
+import org.apache.nifi.controller.repository.FlowFileEvent;
+
+public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable {
+
+ private final String componentId;
+
+ private int flowFilesIn;
+ private int flowFilesOut;
+ private int flowFilesRemoved;
+ private long contentSizeIn;
+ private long contentSizeOut;
+ private long contentSizeRemoved;
+ private long bytesRead;
+ private long bytesWritten;
+ private long processingNanos;
+ private long aggregateLineageMillis;
+ private int flowFilesReceived;
+ private long bytesReceived;
+ private int flowFilesSent;
+ private long bytesSent;
+ private int invocations;
+ private Map<String, Long> counters;
+
+ public StandardFlowFileEvent(final String componentId) {
+ this.componentId = componentId;
+ }
+
+ @Override
+ public String getComponentIdentifier() {
+ return componentId;
+ }
+
+ @Override
+ public int getFlowFilesIn() {
+ return flowFilesIn;
+ }
+
+ public void setFlowFilesIn(int flowFilesIn) {
+ this.flowFilesIn = flowFilesIn;
+ }
+
+ @Override
+ public int getFlowFilesOut() {
+ return flowFilesOut;
+ }
+
+ public void setFlowFilesOut(int flowFilesOut) {
+ this.flowFilesOut = flowFilesOut;
+ }
+
+ @Override
+ public long getContentSizeIn() {
+ return contentSizeIn;
+ }
+
+ public void setContentSizeIn(long contentSizeIn) {
+ this.contentSizeIn = contentSizeIn;
+ }
+
+ @Override
+ public long getContentSizeOut() {
+ return contentSizeOut;
+ }
+
+ public void setContentSizeOut(long contentSizeOut) {
+ this.contentSizeOut = contentSizeOut;
+ }
+
+ @Override
+ public long getContentSizeRemoved() {
+ return contentSizeRemoved;
+ }
+
+ public void setContentSizeRemoved(final long contentSizeRemoved) {
+ this.contentSizeRemoved = contentSizeRemoved;
+ }
+
+ @Override
+ public int getFlowFilesRemoved() {
+ return flowFilesRemoved;
+ }
+
+ public void setFlowFilesRemoved(final int flowFilesRemoved) {
+ this.flowFilesRemoved = flowFilesRemoved;
+ }
+
+ @Override
+ public long getBytesRead() {
+ return bytesRead;
+ }
+
+ public void setBytesRead(long bytesRead) {
+ this.bytesRead = bytesRead;
+ }
+
+ @Override
+ public long getBytesWritten() {
+ return bytesWritten;
+ }
+
+ public void setBytesWritten(long bytesWritten) {
+ this.bytesWritten = bytesWritten;
+ }
+
+ @Override
+ public long getProcessingNanoseconds() {
+ return processingNanos;
+ }
+
+ public void setProcessingNanos(final long processingNanos) {
+ this.processingNanos = processingNanos;
+ }
+
+ @Override
+ public int getInvocations() {
+ return invocations;
+ }
+
+ public void setInvocations(final int invocations) {
+ this.invocations = invocations;
+ }
+
+ @Override
+ public int getFlowFilesReceived() {
+ return flowFilesReceived;
+ }
+
+ public void setFlowFilesReceived(int flowFilesReceived) {
+ this.flowFilesReceived = flowFilesReceived;
+ }
+
+ @Override
+ public long getBytesReceived() {
+ return bytesReceived;
+ }
+
+ public void setBytesReceived(long bytesReceived) {
+ this.bytesReceived = bytesReceived;
+ }
+
+ @Override
+ public int getFlowFilesSent() {
+ return flowFilesSent;
+ }
+
+ public void setFlowFilesSent(int flowFilesSent) {
+ this.flowFilesSent = flowFilesSent;
+ }
+
+ @Override
+ public long getBytesSent() {
+ return bytesSent;
+ }
+
+ public void setBytesSent(long bytesSent) {
+ this.bytesSent = bytesSent;
+ }
+
+ @Override
+ public long getAverageLineageMillis() {
+ if (flowFilesOut == 0 && flowFilesRemoved == 0) {
+ return 0L;
+ }
+
+ return aggregateLineageMillis / (flowFilesOut + flowFilesRemoved);
+ }
+
+ public void setAggregateLineageMillis(long lineageMilliseconds) {
+ this.aggregateLineageMillis = lineageMilliseconds;
+ }
+
+ @Override
+ public long getAggregateLineageMillis() {
+ return aggregateLineageMillis;
+ }
+
+ @Override
+ public Map<String, Long> getCounters() {
+ return counters;
+ }
+
+ public void setCounters(final Map<String, Long> counters) {
+ this.counters = counters;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
index 22684a6..0af8657 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
@@ -33,9 +33,9 @@ import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.repository.BatchingSessionFactory;
import org.apache.nifi.controller.repository.ProcessContext;
-import org.apache.nifi.controller.repository.StandardFlowFileEvent;
import org.apache.nifi.controller.repository.StandardProcessSession;
import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
+import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
index 756e576..a126589 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
@@ -37,11 +38,25 @@ public class StatusHistoryUtil {
final Set<MetricDescriptor<?>> metricDescriptors = new LinkedHashSet<>();
final LinkedHashMap<String, String> componentDetails = new LinkedHashMap<>(statusHistory.getComponentDetails());
+ final Set<String> metricNames = new HashSet<>();
for (final StatusSnapshot snapshot : statusHistory.getStatusSnapshots()) {
- snapshotDtos.add(StatusHistoryUtil.createStatusSnapshotDto(snapshot));
+ final StatusSnapshotDTO snapshotDto = StatusHistoryUtil.createStatusSnapshotDto(snapshot);
+ snapshotDtos.add(snapshotDto);
+ metricNames.addAll(snapshotDto.getStatusMetrics().keySet());
metricDescriptors.addAll(snapshot.getStatusMetrics().keySet());
}
+ // We need to ensure that the 'aggregate snapshot' has an entry for every metric, including counters.
+ // So for any metric that has is not in the aggregate snapshot, add it with a value of 0
+ for (final StatusSnapshotDTO snapshotDto : snapshotDtos) {
+ final Map<String, Long> metrics = snapshotDto.getStatusMetrics();
+ for (final String metricName : metricNames) {
+ if (!metrics.containsKey(metricName)) {
+ metrics.put(metricName, 0L);
+ }
+ }
+ }
+
final StatusHistoryDTO dto = new StatusHistoryDTO();
dto.setGenerated(new Date());
dto.setComponentDetails(componentDetails);
http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
index 3203972..50a5123 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
@@ -20,6 +20,7 @@ import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
import org.apache.nifi.util.ComponentStatusReport;
import org.apache.nifi.util.ComponentStatusReport.ComponentType;
import org.apache.nifi.util.NiFiProperties;
@@ -29,6 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
+import java.util.Map;
public class VolatileComponentStatusRepository implements ComponentStatusRepository {
@@ -72,7 +74,7 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit
}
@Override
- public StatusHistory getProcessorStatusHistory(final String processorId, final Date start, final Date end, final int preferredDataPoints) {
+ public StatusHistory getProcessorStatusHistory(final String processorId, final Date start, final Date end, final int preferredDataPoints, final boolean includeCounters) {
final StandardStatusHistory history = new StandardStatusHistory();
history.setComponentDetail(COMPONENT_DETAIL_ID, processorId);
@@ -98,6 +100,21 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit
}
}
+ if (includeCounters) {
+ final Map<String, Long> counters = status.getCounters();
+ if (counters != null) {
+ for (final Map.Entry<String, Long> entry : counters.entrySet()) {
+ final String counterName = entry.getKey();
+
+ final String label = entry.getKey() + " (5 mins)";
+ final MetricDescriptor<ProcessorStatus> metricDescriptor = new StandardMetricDescriptor<>(entry.getKey(), label, label, Formatter.COUNT,
+ s -> s.getCounters() == null ? null : s.getCounters().get(counterName));
+
+ snapshot.addStatusMetric(metricDescriptor, entry.getValue());
+ }
+ }
+ }
+
history.addStatusSnapshot(snapshot);
return true;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/695e8aa9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
index f2a7eee..8c27078 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
@@ -25,9 +25,9 @@ import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.repository.BatchingSessionFactory;
import org.apache.nifi.controller.repository.ProcessContext;
-import org.apache.nifi.controller.repository.StandardFlowFileEvent;
import org.apache.nifi.controller.repository.StandardProcessSession;
import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
+import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
import org.apache.nifi.controller.scheduling.ProcessContextFactory;
import org.apache.nifi.controller.scheduling.ScheduleState;
import org.apache.nifi.controller.scheduling.SchedulingAgent;