You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2016/07/15 15:25:42 UTC

nifi git commit: NIFI-2194 This closes #659. Caching metrics in AmbariReportingTask so each iteration sends last iteration's metrics

Repository: nifi
Updated Branches:
  refs/heads/master 54574e388 -> afc038d2c


NIFI-2194 This closes #659. Caching metrics in AmbariReportingTask so each iteration sends last iteration's metrics


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/afc038d2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/afc038d2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/afc038d2

Branch: refs/heads/master
Commit: afc038d2c021d224d575ad4bbb05a7840d679bb7
Parents: 54574e3
Author: Bryan Bende <bb...@apache.org>
Authored: Wed Jul 13 17:53:54 2016 -0400
Committer: joewitt <jo...@apache.org>
Committed: Fri Jul 15 11:25:22 2016 -0400

----------------------------------------------------------------------
 .../reporting/ambari/AmbariReportingTask.java   | 45 ++++++++++++--------
 1 file changed, 28 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/afc038d2/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
index cff0b48..7092dd0 100644
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
@@ -47,7 +47,10 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 @Tags({"reporting", "ambari", "metrics"})
-@CapabilityDescription("Publishes metrics from NiFi to Ambari")
+@CapabilityDescription("Publishes metrics from NiFi to Ambari Metrics Service (AMS). Due to how the Ambari Metrics Service " +
+        "works, this reporting task should be scheduled to run every 60 seconds. Each iteration it will send the metrics " +
+        "from the previous iteration, and calculate the current metrics to be sent on next iteration. Scheduling this reporting " +
+        "task at a frequency other than 60 seconds may produce unexpected results.")
 public class AmbariReportingTask extends AbstractReportingTask {
 
     static final PropertyDescriptor METRICS_COLLECTOR_URL = new PropertyDescriptor.Builder()
@@ -80,6 +83,7 @@ public class AmbariReportingTask extends AbstractReportingTask {
     private volatile Client client;
     private volatile JsonBuilderFactory factory;
     private volatile VirtualMachineMetrics virtualMachineMetrics;
+    private volatile JsonObject previousMetrics = null;
 
     private final MetricsService metricsService = new MetricsService();
 
@@ -98,6 +102,7 @@ public class AmbariReportingTask extends AbstractReportingTask {
         factory = Json.createBuilderFactory(config);
         client = createClient();
         virtualMachineMetrics = VirtualMachineMetrics.getInstance();
+        previousMetrics = null;
     }
 
     // used for testing to allow tests to override the client
@@ -107,8 +112,6 @@ public class AmbariReportingTask extends AbstractReportingTask {
 
     @Override
     public void onTrigger(final ReportingContext context) {
-        final ProcessGroupStatus status = context.getEventAccess().getControllerStatus();
-
         final String metricsCollectorUrl = context.getProperty(METRICS_COLLECTOR_URL)
                 .evaluateAttributeExpressions().getValue();
         final String applicationId = context.getProperty(APPLICATION_ID)
@@ -118,6 +121,27 @@ public class AmbariReportingTask extends AbstractReportingTask {
 
         final long start = System.currentTimeMillis();
 
+        // send the metrics from last execution
+        if (previousMetrics != null) {
+            final WebTarget metricsTarget = client.target(metricsCollectorUrl);
+            final Invocation.Builder invocation = metricsTarget.request();
+
+            final Entity<String> entity = Entity.json(previousMetrics.toString());
+            getLogger().debug("Sending metrics {} to Ambari", new Object[]{entity.getEntity()});
+
+            final Response response = invocation.post(entity);
+            if (response.getStatus() == Response.Status.OK.getStatusCode()) {
+                final long completedMillis = TimeUnit.NANOSECONDS.toMillis(System.currentTimeMillis() - start);
+                getLogger().info("Successfully sent metrics to Ambari in {} ms", new Object[]{completedMillis});
+            } else {
+                final String responseEntity = response.hasEntity() ? response.readEntity(String.class) : "unknown error";
+                getLogger().error("Error sending metrics to Ambari due to {} - {}", new Object[]{response.getStatus(), responseEntity});
+            }
+
+        }
+
+        // calculate the current metrics, but store them to be sent next time
+        final ProcessGroupStatus status = context.getEventAccess().getControllerStatus();
         final Map<String,String> statusMetrics = metricsService.getMetrics(status);
         final Map<String,String> jvmMetrics = metricsService.getMetrics(virtualMachineMetrics);
 
@@ -132,20 +156,7 @@ public class AmbariReportingTask extends AbstractReportingTask {
                 .addAllMetrics(jvmMetrics)
                 .build();
 
-        final WebTarget metricsTarget = client.target(metricsCollectorUrl);
-        final Invocation.Builder invocation = metricsTarget.request();
-
-        final Entity<String> entity = Entity.json(metricsObject.toString());
-        getLogger().debug("Sending metrics {} to Ambari", new Object[]{entity.getEntity()});
-
-        final Response response = invocation.post(entity);
-        if (response.getStatus() == Response.Status.OK.getStatusCode()) {
-            final long completedMillis = TimeUnit.NANOSECONDS.toMillis(System.currentTimeMillis() - start);
-            getLogger().info("Successfully sent metrics to Ambari in {} ms", new Object[] {completedMillis});
-        } else {
-            final String responseEntity = response.hasEntity() ? response.readEntity(String.class) : "unknown error";
-            getLogger().error("Error sending metrics to Ambari due to {} - {}", new Object[]{response.getStatus(), responseEntity});
-        }
+        previousMetrics = metricsObject;
     }
 
 }