You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2017/01/03 21:19:18 UTC
nifi git commit: NIFI-3259 - Process group level option in Ambari
reporting task
Repository: nifi
Updated Branches:
refs/heads/master 9b47961d1 -> 2c0f1c348
NIFI-3259 - Process group level option in Ambari reporting task
This closes #1360.
Signed-off-by: Bryan Bende <bb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2c0f1c34
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2c0f1c34
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2c0f1c34
Branch: refs/heads/master
Commit: 2c0f1c348e15b35355ec2a1e2ff5503c331c7ad9
Parents: 9b47961
Author: Pierre Villard <pi...@gmail.com>
Authored: Tue Dec 27 20:16:32 2016 +0100
Committer: Bryan Bende <bb...@apache.org>
Committed: Tue Jan 3 16:18:53 2017 -0500
----------------------------------------------------------------------
.../org/apache/nifi/reporting/EventAccess.java | 5 ++
.../org/apache/nifi/util/MockEventAccess.java | 6 ++
.../reporting/ambari/AmbariReportingTask.java | 60 +++++++++++++-------
.../reporting/ambari/metrics/MetricNames.java | 3 +
.../ambari/metrics/MetricsService.java | 34 +++++++----
.../ambari/TestAmbariReportingTask.java | 3 +
.../ambari/metrics/TestMetricsService.java | 40 ++++++++++++-
.../apache/nifi/controller/FlowController.java | 1 +
8 files changed, 117 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/2c0f1c34/nifi-api/src/main/java/org/apache/nifi/reporting/EventAccess.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/reporting/EventAccess.java b/nifi-api/src/main/java/org/apache/nifi/reporting/EventAccess.java
index 70dedfa..c219032 100644
--- a/nifi-api/src/main/java/org/apache/nifi/reporting/EventAccess.java
+++ b/nifi-api/src/main/java/org/apache/nifi/reporting/EventAccess.java
@@ -32,6 +32,11 @@ public interface EventAccess {
ProcessGroupStatus getControllerStatus();
/**
+ * @return the status of all components in the specified group.
+ */
+ ProcessGroupStatus getGroupStatus(final String groupId);
+
+ /**
* Convenience method to obtain Provenance Events starting with (and
* including) the given ID. If no event exists with that ID, the first event
* to be returned will be have an ID greater than <code>firstEventId</code>.
http://git-wip-us.apache.org/repos/asf/nifi/blob/2c0f1c34/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java
index 5c9a0ed..2a2aab2 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java
@@ -42,6 +42,11 @@ public class MockEventAccess implements EventAccess {
}
@Override
+ public ProcessGroupStatus getGroupStatus(final String groupId) {
+ return null;
+ }
+
+ @Override
public List<ProvenanceEventRecord> getProvenanceEvents(long firstEventId, int maxRecords) throws IOException {
if (firstEventId < 0 || maxRecords < 1) {
throw new IllegalArgumentException();
@@ -65,6 +70,7 @@ public class MockEventAccess implements EventAccess {
this.provenanceRecords.add(record);
}
+ @Override
public ProvenanceEventRepository getProvenanceRepository() {
return null;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/2c0f1c34/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
index 5080583..0ad4345 100644
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
@@ -80,6 +80,15 @@ public class AmbariReportingTask extends AbstractReportingTask {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
+ static final PropertyDescriptor PROCESS_GROUP_ID = new PropertyDescriptor.Builder()
+ .name("Process Group ID")
+ .description("If specified, the reporting task will send metrics about this process group only. If"
+ + " not, the root process group is used and global metrics are sent.")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
private volatile Client client;
private volatile JsonBuilderFactory factory;
private volatile VirtualMachineMetrics virtualMachineMetrics;
@@ -93,6 +102,7 @@ public class AmbariReportingTask extends AbstractReportingTask {
properties.add(METRICS_COLLECTOR_URL);
properties.add(APPLICATION_ID);
properties.add(HOSTNAME);
+ properties.add(PROCESS_GROUP_ID);
return properties;
}
@@ -112,12 +122,12 @@ public class AmbariReportingTask extends AbstractReportingTask {
@Override
public void onTrigger(final ReportingContext context) {
- final String metricsCollectorUrl = context.getProperty(METRICS_COLLECTOR_URL)
- .evaluateAttributeExpressions().getValue();
- final String applicationId = context.getProperty(APPLICATION_ID)
- .evaluateAttributeExpressions().getValue();
- final String hostname = context.getProperty(HOSTNAME)
- .evaluateAttributeExpressions().getValue();
+ final String metricsCollectorUrl = context.getProperty(METRICS_COLLECTOR_URL).evaluateAttributeExpressions().getValue();
+ final String applicationId = context.getProperty(APPLICATION_ID).evaluateAttributeExpressions().getValue();
+ final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
+
+ final boolean pgIdIsSet = context.getProperty(PROCESS_GROUP_ID).isSet();
+ final String processGroupId = pgIdIsSet ? context.getProperty(PROCESS_GROUP_ID).evaluateAttributeExpressions().getValue() : null;
final long start = System.currentTimeMillis();
@@ -140,22 +150,28 @@ public class AmbariReportingTask extends AbstractReportingTask {
}
// calculate the current metrics, but store them to be sent next time
- final ProcessGroupStatus status = context.getEventAccess().getControllerStatus();
- final Map<String,String> statusMetrics = metricsService.getMetrics(status);
- final Map<String,String> jvmMetrics = metricsService.getMetrics(virtualMachineMetrics);
-
- final MetricsBuilder metricsBuilder = new MetricsBuilder(factory);
-
- final JsonObject metricsObject = metricsBuilder
- .applicationId(applicationId)
- .instanceId(status.getId())
- .hostname(hostname)
- .timestamp(start)
- .addAllMetrics(statusMetrics)
- .addAllMetrics(jvmMetrics)
- .build();
-
- previousMetrics = metricsObject;
+ final ProcessGroupStatus status = processGroupId == null ? context.getEventAccess().getControllerStatus() : context.getEventAccess().getGroupStatus(processGroupId);
+
+ if(status != null) {
+ final Map<String,String> statusMetrics = metricsService.getMetrics(status, pgIdIsSet);
+ final Map<String,String> jvmMetrics = metricsService.getMetrics(virtualMachineMetrics);
+
+ final MetricsBuilder metricsBuilder = new MetricsBuilder(factory);
+
+ final JsonObject metricsObject = metricsBuilder
+ .applicationId(applicationId)
+ .instanceId(status.getId())
+ .hostname(hostname)
+ .timestamp(start)
+ .addAllMetrics(statusMetrics)
+ .addAllMetrics(jvmMetrics)
+ .build();
+
+ previousMetrics = metricsObject;
+ } else {
+ getLogger().error("No process group status with ID = {}", new Object[]{processGroupId});
+ previousMetrics = null;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/2c0f1c34/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java
index 4b62668..20cfa4e 100644
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java
@@ -21,6 +21,9 @@ package org.apache.nifi.reporting.ambari.metrics;
*/
public interface MetricNames {
+ // Metric Name separator
+ String METRIC_NAME_SEPARATOR = ".";
+
// NiFi Metrics
String FLOW_FILES_RECEIVED = "FlowFilesReceivedLast5Minutes";
String BYTES_RECEIVED = "BytesReceivedLast5Minutes";
http://git-wip-us.apache.org/repos/asf/nifi/blob/2c0f1c34/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java
index 0671dab..cef257d 100644
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java
@@ -33,25 +33,26 @@ public class MetricsService {
* Generates a Map of metrics for a ProcessGroupStatus instance.
*
* @param status a ProcessGroupStatus to get metrics from
+ * @param appendPgId if true, the process group ID will be appended at the end of the metric name
* @return a map of metrics for the given status
*/
- public Map<String,String> getMetrics(ProcessGroupStatus status) {
+ public Map<String,String> getMetrics(ProcessGroupStatus status, boolean appendPgId) {
final Map<String,String> metrics = new HashMap<>();
- metrics.put(MetricNames.FLOW_FILES_RECEIVED, String.valueOf(status.getFlowFilesReceived()));
- metrics.put(MetricNames.BYTES_RECEIVED, String.valueOf(status.getBytesReceived()));
- metrics.put(MetricNames.FLOW_FILES_SENT, String.valueOf(status.getFlowFilesSent()));
- metrics.put(MetricNames.BYTES_SENT, String.valueOf(status.getBytesSent()));
- metrics.put(MetricNames.FLOW_FILES_QUEUED, String.valueOf(status.getQueuedCount()));
- metrics.put(MetricNames.BYTES_QUEUED, String.valueOf(status.getQueuedContentSize()));
- metrics.put(MetricNames.BYTES_READ, String.valueOf(status.getBytesRead()));
- metrics.put(MetricNames.BYTES_WRITTEN, String.valueOf(status.getBytesWritten()));
- metrics.put(MetricNames.ACTIVE_THREADS, String.valueOf(status.getActiveThreadCount()));
+ metrics.put(appendPgId(MetricNames.FLOW_FILES_RECEIVED, status, appendPgId), String.valueOf(status.getFlowFilesReceived()));
+ metrics.put(appendPgId(MetricNames.BYTES_RECEIVED, status, appendPgId), String.valueOf(status.getBytesReceived()));
+ metrics.put(appendPgId(MetricNames.FLOW_FILES_SENT, status, appendPgId), String.valueOf(status.getFlowFilesSent()));
+ metrics.put(appendPgId(MetricNames.BYTES_SENT, status, appendPgId), String.valueOf(status.getBytesSent()));
+ metrics.put(appendPgId(MetricNames.FLOW_FILES_QUEUED, status, appendPgId), String.valueOf(status.getQueuedCount()));
+ metrics.put(appendPgId(MetricNames.BYTES_QUEUED, status, appendPgId), String.valueOf(status.getQueuedContentSize()));
+ metrics.put(appendPgId(MetricNames.BYTES_READ, status, appendPgId), String.valueOf(status.getBytesRead()));
+ metrics.put(appendPgId(MetricNames.BYTES_WRITTEN, status, appendPgId), String.valueOf(status.getBytesWritten()));
+ metrics.put(appendPgId(MetricNames.ACTIVE_THREADS, status, appendPgId), String.valueOf(status.getActiveThreadCount()));
final long durationNanos = calculateProcessingNanos(status);
- metrics.put(MetricNames.TOTAL_TASK_DURATION_NANOS, String.valueOf(durationNanos));
+ metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_NANOS, status, appendPgId), String.valueOf(durationNanos));
final long durationSeconds = TimeUnit.SECONDS.convert(durationNanos, TimeUnit.NANOSECONDS);
- metrics.put(MetricNames.TOTAL_TASK_DURATION_SECONDS, String.valueOf(durationSeconds));
+ metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_SECONDS, status, appendPgId), String.valueOf(durationSeconds));
return metrics;
}
@@ -118,4 +119,13 @@ public class MetricsService {
return nanos;
}
+ // append the process group ID if necessary
+ private String appendPgId(String name, ProcessGroupStatus status, boolean appendPgId) {
+ if(appendPgId) {
+ return name + MetricNames.METRIC_NAME_SEPARATOR + status.getId();
+ } else {
+ return name;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/2c0f1c34/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/TestAmbariReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/TestAmbariReportingTask.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/TestAmbariReportingTask.java
index 5e23bf7..8cf0212 100644
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/TestAmbariReportingTask.java
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/TestAmbariReportingTask.java
@@ -110,6 +110,8 @@ public class TestAmbariReportingTask {
.thenReturn(new MockPropertyValue(applicationId));
Mockito.when(context.getProperty(AmbariReportingTask.HOSTNAME))
.thenReturn(new MockPropertyValue(hostName));
+ Mockito.when(context.getProperty(AmbariReportingTask.PROCESS_GROUP_ID))
+ .thenReturn(new MockPropertyValue("1234"));
final EventAccess eventAccess = Mockito.mock(EventAccess.class);
@@ -122,6 +124,7 @@ public class TestAmbariReportingTask {
task.setup(configurationContext);
task.onTrigger(context);
}
+
// override the creation of the client to provide a mock
private class TestableAmbariReportingTask extends AmbariReportingTask {
http://git-wip-us.apache.org/repos/asf/nifi/blob/2c0f1c34/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java
index e8cc792..93224eb 100644
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java
@@ -60,7 +60,7 @@ public class TestMetricsService {
final MetricsService service = new MetricsService();
- final Map<String,String> metrics = service.getMetrics(status);
+ final Map<String,String> metrics = service.getMetrics(status, false);
Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_RECEIVED));
Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_RECEIVED));
@@ -76,6 +76,44 @@ public class TestMetricsService {
}
@Test
+ public void testGetProcessGroupStatusMetricsWithID() {
+ ProcessGroupStatus status = new ProcessGroupStatus();
+ String id = "1234";
+ status.setId(id);
+ status.setFlowFilesReceived(5);
+ status.setBytesReceived(10000);
+ status.setFlowFilesSent(10);
+ status.setBytesSent(20000);
+ status.setQueuedCount(100);
+ status.setQueuedContentSize(1024L);
+ status.setBytesRead(60000L);
+ status.setBytesWritten(80000L);
+ status.setActiveThreadCount(5);
+
+ // create a processor status with processing time
+ ProcessorStatus procStatus = new ProcessorStatus();
+ procStatus.setProcessingNanos(123456789);
+
+ Collection<ProcessorStatus> processorStatuses = new ArrayList<>();
+ processorStatuses.add(procStatus);
+ status.setProcessorStatus(processorStatuses);
+
+ // create a group status with processing time
+ ProcessGroupStatus groupStatus = new ProcessGroupStatus();
+ groupStatus.setProcessorStatus(processorStatuses);
+
+ Collection<ProcessGroupStatus> groupStatuses = new ArrayList<>();
+ groupStatuses.add(groupStatus);
+ status.setProcessGroupStatus(groupStatuses);
+
+ final MetricsService service = new MetricsService();
+
+ final Map<String,String> metrics = service.getMetrics(status, true);
+
+ Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_RECEIVED + MetricNames.METRIC_NAME_SEPARATOR + id));
+ }
+
+ @Test
public void testGetVirtualMachineMetrics() {
final VirtualMachineMetrics virtualMachineMetrics = VirtualMachineMetrics.getInstance();
final MetricsService service = new MetricsService();
http://git-wip-us.apache.org/repos/asf/nifi/blob/2c0f1c34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 59ea8fd..7fd85b9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -2246,6 +2246,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* @param groupId group id
* @return the component status
*/
+ @Override
public ProcessGroupStatus getGroupStatus(final String groupId) {
return getGroupStatus(groupId, getProcessorStats());
}