You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2017/01/03 21:19:18 UTC

nifi git commit: NIFI-3259 - Process group level option in Ambari reporting task

Repository: nifi
Updated Branches:
  refs/heads/master 9b47961d1 -> 2c0f1c348


NIFI-3259 - Process group level option in Ambari reporting task

This closes #1360.

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2c0f1c34
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2c0f1c34
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2c0f1c34

Branch: refs/heads/master
Commit: 2c0f1c348e15b35355ec2a1e2ff5503c331c7ad9
Parents: 9b47961
Author: Pierre Villard <pi...@gmail.com>
Authored: Tue Dec 27 20:16:32 2016 +0100
Committer: Bryan Bende <bb...@apache.org>
Committed: Tue Jan 3 16:18:53 2017 -0500

----------------------------------------------------------------------
 .../org/apache/nifi/reporting/EventAccess.java  |  5 ++
 .../org/apache/nifi/util/MockEventAccess.java   |  6 ++
 .../reporting/ambari/AmbariReportingTask.java   | 60 +++++++++++++-------
 .../reporting/ambari/metrics/MetricNames.java   |  3 +
 .../ambari/metrics/MetricsService.java          | 34 +++++++----
 .../ambari/TestAmbariReportingTask.java         |  3 +
 .../ambari/metrics/TestMetricsService.java      | 40 ++++++++++++-
 .../apache/nifi/controller/FlowController.java  |  1 +
 8 files changed, 117 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/2c0f1c34/nifi-api/src/main/java/org/apache/nifi/reporting/EventAccess.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/reporting/EventAccess.java b/nifi-api/src/main/java/org/apache/nifi/reporting/EventAccess.java
index 70dedfa..c219032 100644
--- a/nifi-api/src/main/java/org/apache/nifi/reporting/EventAccess.java
+++ b/nifi-api/src/main/java/org/apache/nifi/reporting/EventAccess.java
@@ -32,6 +32,11 @@ public interface EventAccess {
     ProcessGroupStatus getControllerStatus();
 
     /**
+     * @return the status of all components in the specified group.
+     */
+    ProcessGroupStatus getGroupStatus(final String groupId);
+
+    /**
      * Convenience method to obtain Provenance Events starting with (and
      * including) the given ID. If no event exists with that ID, the first event
      * to be returned will be have an ID greater than <code>firstEventId</code>.

http://git-wip-us.apache.org/repos/asf/nifi/blob/2c0f1c34/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java
index 5c9a0ed..2a2aab2 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java
@@ -42,6 +42,11 @@ public class MockEventAccess implements EventAccess {
     }
 
     @Override
+    public ProcessGroupStatus getGroupStatus(final String groupId) {
+        return null;
+    }
+
+    @Override
     public List<ProvenanceEventRecord> getProvenanceEvents(long firstEventId, int maxRecords) throws IOException {
         if (firstEventId < 0 || maxRecords < 1) {
             throw new IllegalArgumentException();
@@ -65,6 +70,7 @@ public class MockEventAccess implements EventAccess {
         this.provenanceRecords.add(record);
     }
 
+    @Override
     public ProvenanceEventRepository getProvenanceRepository() {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/2c0f1c34/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
index 5080583..0ad4345 100644
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
@@ -80,6 +80,15 @@ public class AmbariReportingTask extends AbstractReportingTask {
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
+    static final PropertyDescriptor PROCESS_GROUP_ID = new PropertyDescriptor.Builder()
+            .name("Process Group ID")
+            .description("If specified, the reporting task will send metrics about this process group only. If"
+                    + " not, the root process group is used and global metrics are sent.")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
     private volatile Client client;
     private volatile JsonBuilderFactory factory;
     private volatile VirtualMachineMetrics virtualMachineMetrics;
@@ -93,6 +102,7 @@ public class AmbariReportingTask extends AbstractReportingTask {
         properties.add(METRICS_COLLECTOR_URL);
         properties.add(APPLICATION_ID);
         properties.add(HOSTNAME);
+        properties.add(PROCESS_GROUP_ID);
         return properties;
     }
 
@@ -112,12 +122,12 @@ public class AmbariReportingTask extends AbstractReportingTask {
 
     @Override
     public void onTrigger(final ReportingContext context) {
-        final String metricsCollectorUrl = context.getProperty(METRICS_COLLECTOR_URL)
-                .evaluateAttributeExpressions().getValue();
-        final String applicationId = context.getProperty(APPLICATION_ID)
-                .evaluateAttributeExpressions().getValue();
-        final String hostname = context.getProperty(HOSTNAME)
-                .evaluateAttributeExpressions().getValue();
+        final String metricsCollectorUrl = context.getProperty(METRICS_COLLECTOR_URL).evaluateAttributeExpressions().getValue();
+        final String applicationId = context.getProperty(APPLICATION_ID).evaluateAttributeExpressions().getValue();
+        final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
+
+        final boolean pgIdIsSet = context.getProperty(PROCESS_GROUP_ID).isSet();
+        final String processGroupId = pgIdIsSet ? context.getProperty(PROCESS_GROUP_ID).evaluateAttributeExpressions().getValue() : null;
 
         final long start = System.currentTimeMillis();
 
@@ -140,22 +150,28 @@ public class AmbariReportingTask extends AbstractReportingTask {
         }
 
         // calculate the current metrics, but store them to be sent next time
-        final ProcessGroupStatus status = context.getEventAccess().getControllerStatus();
-        final Map<String,String> statusMetrics = metricsService.getMetrics(status);
-        final Map<String,String> jvmMetrics = metricsService.getMetrics(virtualMachineMetrics);
-
-        final MetricsBuilder metricsBuilder = new MetricsBuilder(factory);
-
-        final JsonObject metricsObject = metricsBuilder
-                .applicationId(applicationId)
-                .instanceId(status.getId())
-                .hostname(hostname)
-                .timestamp(start)
-                .addAllMetrics(statusMetrics)
-                .addAllMetrics(jvmMetrics)
-                .build();
-
-        previousMetrics = metricsObject;
+        final ProcessGroupStatus status = processGroupId == null ? context.getEventAccess().getControllerStatus() : context.getEventAccess().getGroupStatus(processGroupId);
+
+        if(status != null) {
+            final Map<String,String> statusMetrics = metricsService.getMetrics(status, pgIdIsSet);
+            final Map<String,String> jvmMetrics = metricsService.getMetrics(virtualMachineMetrics);
+
+            final MetricsBuilder metricsBuilder = new MetricsBuilder(factory);
+
+            final JsonObject metricsObject = metricsBuilder
+                    .applicationId(applicationId)
+                    .instanceId(status.getId())
+                    .hostname(hostname)
+                    .timestamp(start)
+                    .addAllMetrics(statusMetrics)
+                    .addAllMetrics(jvmMetrics)
+                    .build();
+
+            previousMetrics = metricsObject;
+        } else {
+            getLogger().error("No process group status with ID = {}", new Object[]{processGroupId});
+            previousMetrics = null;
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/2c0f1c34/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java
index 4b62668..20cfa4e 100644
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java
@@ -21,6 +21,9 @@ package org.apache.nifi.reporting.ambari.metrics;
  */
 public interface MetricNames {
 
+    // Metric Name separator
+    String METRIC_NAME_SEPARATOR = ".";
+
     // NiFi Metrics
     String FLOW_FILES_RECEIVED = "FlowFilesReceivedLast5Minutes";
     String BYTES_RECEIVED = "BytesReceivedLast5Minutes";

http://git-wip-us.apache.org/repos/asf/nifi/blob/2c0f1c34/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java
index 0671dab..cef257d 100644
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java
@@ -33,25 +33,26 @@ public class MetricsService {
      * Generates a Map of metrics for a ProcessGroupStatus instance.
      *
      * @param status a ProcessGroupStatus to get metrics from
+     * @param appendPgId if true, the process group ID will be appended at the end of the metric name
      * @return a map of metrics for the given status
      */
-    public Map<String,String> getMetrics(ProcessGroupStatus status) {
+    public Map<String,String> getMetrics(ProcessGroupStatus status, boolean appendPgId) {
         final Map<String,String> metrics = new HashMap<>();
-        metrics.put(MetricNames.FLOW_FILES_RECEIVED, String.valueOf(status.getFlowFilesReceived()));
-        metrics.put(MetricNames.BYTES_RECEIVED, String.valueOf(status.getBytesReceived()));
-        metrics.put(MetricNames.FLOW_FILES_SENT, String.valueOf(status.getFlowFilesSent()));
-        metrics.put(MetricNames.BYTES_SENT, String.valueOf(status.getBytesSent()));
-        metrics.put(MetricNames.FLOW_FILES_QUEUED, String.valueOf(status.getQueuedCount()));
-        metrics.put(MetricNames.BYTES_QUEUED, String.valueOf(status.getQueuedContentSize()));
-        metrics.put(MetricNames.BYTES_READ, String.valueOf(status.getBytesRead()));
-        metrics.put(MetricNames.BYTES_WRITTEN, String.valueOf(status.getBytesWritten()));
-        metrics.put(MetricNames.ACTIVE_THREADS, String.valueOf(status.getActiveThreadCount()));
+        metrics.put(appendPgId(MetricNames.FLOW_FILES_RECEIVED, status, appendPgId), String.valueOf(status.getFlowFilesReceived()));
+        metrics.put(appendPgId(MetricNames.BYTES_RECEIVED, status, appendPgId), String.valueOf(status.getBytesReceived()));
+        metrics.put(appendPgId(MetricNames.FLOW_FILES_SENT, status, appendPgId), String.valueOf(status.getFlowFilesSent()));
+        metrics.put(appendPgId(MetricNames.BYTES_SENT, status, appendPgId), String.valueOf(status.getBytesSent()));
+        metrics.put(appendPgId(MetricNames.FLOW_FILES_QUEUED, status, appendPgId), String.valueOf(status.getQueuedCount()));
+        metrics.put(appendPgId(MetricNames.BYTES_QUEUED, status, appendPgId), String.valueOf(status.getQueuedContentSize()));
+        metrics.put(appendPgId(MetricNames.BYTES_READ, status, appendPgId), String.valueOf(status.getBytesRead()));
+        metrics.put(appendPgId(MetricNames.BYTES_WRITTEN, status, appendPgId), String.valueOf(status.getBytesWritten()));
+        metrics.put(appendPgId(MetricNames.ACTIVE_THREADS, status, appendPgId), String.valueOf(status.getActiveThreadCount()));
 
         final long durationNanos = calculateProcessingNanos(status);
-        metrics.put(MetricNames.TOTAL_TASK_DURATION_NANOS, String.valueOf(durationNanos));
+        metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_NANOS, status, appendPgId), String.valueOf(durationNanos));
 
         final long durationSeconds = TimeUnit.SECONDS.convert(durationNanos, TimeUnit.NANOSECONDS);
-        metrics.put(MetricNames.TOTAL_TASK_DURATION_SECONDS, String.valueOf(durationSeconds));
+        metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_SECONDS, status, appendPgId), String.valueOf(durationSeconds));
 
         return metrics;
     }
@@ -118,4 +119,13 @@ public class MetricsService {
         return nanos;
     }
 
+    // append the process group ID if necessary
+    private String appendPgId(String name, ProcessGroupStatus status, boolean appendPgId) {
+        if(appendPgId) {
+            return name + MetricNames.METRIC_NAME_SEPARATOR + status.getId();
+        } else {
+            return name;
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/2c0f1c34/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/TestAmbariReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/TestAmbariReportingTask.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/TestAmbariReportingTask.java
index 5e23bf7..8cf0212 100644
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/TestAmbariReportingTask.java
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/TestAmbariReportingTask.java
@@ -110,6 +110,8 @@ public class TestAmbariReportingTask {
                 .thenReturn(new MockPropertyValue(applicationId));
         Mockito.when(context.getProperty(AmbariReportingTask.HOSTNAME))
                 .thenReturn(new MockPropertyValue(hostName));
+        Mockito.when(context.getProperty(AmbariReportingTask.PROCESS_GROUP_ID))
+                .thenReturn(new MockPropertyValue("1234"));
 
 
         final EventAccess eventAccess = Mockito.mock(EventAccess.class);
@@ -122,6 +124,7 @@ public class TestAmbariReportingTask {
         task.setup(configurationContext);
         task.onTrigger(context);
     }
+
     // override the creation of the client to provide a mock
     private class TestableAmbariReportingTask extends AmbariReportingTask {
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/2c0f1c34/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java
index e8cc792..93224eb 100644
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java
@@ -60,7 +60,7 @@ public class TestMetricsService {
 
         final MetricsService service = new MetricsService();
 
-        final Map<String,String> metrics = service.getMetrics(status);
+        final Map<String,String> metrics = service.getMetrics(status, false);
 
         Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_RECEIVED));
         Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_RECEIVED));
@@ -76,6 +76,44 @@ public class TestMetricsService {
     }
 
     @Test
+    public void testGetProcessGroupStatusMetricsWithID() {
+        ProcessGroupStatus status = new ProcessGroupStatus();
+        String id = "1234";
+        status.setId(id);
+        status.setFlowFilesReceived(5);
+        status.setBytesReceived(10000);
+        status.setFlowFilesSent(10);
+        status.setBytesSent(20000);
+        status.setQueuedCount(100);
+        status.setQueuedContentSize(1024L);
+        status.setBytesRead(60000L);
+        status.setBytesWritten(80000L);
+        status.setActiveThreadCount(5);
+
+        // create a processor status with processing time
+        ProcessorStatus procStatus = new ProcessorStatus();
+        procStatus.setProcessingNanos(123456789);
+
+        Collection<ProcessorStatus> processorStatuses = new ArrayList<>();
+        processorStatuses.add(procStatus);
+        status.setProcessorStatus(processorStatuses);
+
+        // create a group status with processing time
+        ProcessGroupStatus groupStatus = new ProcessGroupStatus();
+        groupStatus.setProcessorStatus(processorStatuses);
+
+        Collection<ProcessGroupStatus> groupStatuses = new ArrayList<>();
+        groupStatuses.add(groupStatus);
+        status.setProcessGroupStatus(groupStatuses);
+
+        final MetricsService service = new MetricsService();
+
+        final Map<String,String> metrics = service.getMetrics(status, true);
+
+        Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_RECEIVED + MetricNames.METRIC_NAME_SEPARATOR + id));
+    }
+
+    @Test
     public void testGetVirtualMachineMetrics() {
         final VirtualMachineMetrics virtualMachineMetrics = VirtualMachineMetrics.getInstance();
         final MetricsService service = new MetricsService();

http://git-wip-us.apache.org/repos/asf/nifi/blob/2c0f1c34/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 59ea8fd..7fd85b9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -2246,6 +2246,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
      * @param groupId group id
      * @return the component status
      */
+    @Override
     public ProcessGroupStatus getGroupStatus(final String groupId) {
         return getGroupStatus(groupId, getProcessorStats());
     }