You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2016/07/15 15:25:42 UTC
nifi git commit: NIFI-2194 This closes #659. Caching metrics in
AmbariReportingTask so each iteration sends last iteration's metrics
Repository: nifi
Updated Branches:
refs/heads/master 54574e388 -> afc038d2c
NIFI-2194 This closes #659. Caching metrics in AmbariReportingTask so each iteration sends last iteration's metrics
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/afc038d2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/afc038d2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/afc038d2
Branch: refs/heads/master
Commit: afc038d2c021d224d575ad4bbb05a7840d679bb7
Parents: 54574e3
Author: Bryan Bende <bb...@apache.org>
Authored: Wed Jul 13 17:53:54 2016 -0400
Committer: joewitt <jo...@apache.org>
Committed: Fri Jul 15 11:25:22 2016 -0400
----------------------------------------------------------------------
.../reporting/ambari/AmbariReportingTask.java | 45 ++++++++++++--------
1 file changed, 28 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/afc038d2/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
index cff0b48..7092dd0 100644
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
@@ -47,7 +47,10 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
@Tags({"reporting", "ambari", "metrics"})
-@CapabilityDescription("Publishes metrics from NiFi to Ambari")
+@CapabilityDescription("Publishes metrics from NiFi to Ambari Metrics Service (AMS). Due to how the Ambari Metrics Service " +
+ "works, this reporting task should be scheduled to run every 60 seconds. Each iteration it will send the metrics " +
+ "from the previous iteration, and calculate the current metrics to be sent on next iteration. Scheduling this reporting " +
+ "task at a frequency other than 60 seconds may produce unexpected results.")
public class AmbariReportingTask extends AbstractReportingTask {
static final PropertyDescriptor METRICS_COLLECTOR_URL = new PropertyDescriptor.Builder()
@@ -80,6 +83,7 @@ public class AmbariReportingTask extends AbstractReportingTask {
private volatile Client client;
private volatile JsonBuilderFactory factory;
private volatile VirtualMachineMetrics virtualMachineMetrics;
+ private volatile JsonObject previousMetrics = null;
private final MetricsService metricsService = new MetricsService();
@@ -98,6 +102,7 @@ public class AmbariReportingTask extends AbstractReportingTask {
factory = Json.createBuilderFactory(config);
client = createClient();
virtualMachineMetrics = VirtualMachineMetrics.getInstance();
+ previousMetrics = null;
}
// used for testing to allow tests to override the client
@@ -107,8 +112,6 @@ public class AmbariReportingTask extends AbstractReportingTask {
@Override
public void onTrigger(final ReportingContext context) {
- final ProcessGroupStatus status = context.getEventAccess().getControllerStatus();
-
final String metricsCollectorUrl = context.getProperty(METRICS_COLLECTOR_URL)
.evaluateAttributeExpressions().getValue();
final String applicationId = context.getProperty(APPLICATION_ID)
@@ -118,6 +121,27 @@ public class AmbariReportingTask extends AbstractReportingTask {
final long start = System.currentTimeMillis();
+ // send the metrics from last execution
+ if (previousMetrics != null) {
+ final WebTarget metricsTarget = client.target(metricsCollectorUrl);
+ final Invocation.Builder invocation = metricsTarget.request();
+
+ final Entity<String> entity = Entity.json(previousMetrics.toString());
+ getLogger().debug("Sending metrics {} to Ambari", new Object[]{entity.getEntity()});
+
+ final Response response = invocation.post(entity);
+ if (response.getStatus() == Response.Status.OK.getStatusCode()) {
+ final long completedMillis = TimeUnit.NANOSECONDS.toMillis(System.currentTimeMillis() - start);
+ getLogger().info("Successfully sent metrics to Ambari in {} ms", new Object[]{completedMillis});
+ } else {
+ final String responseEntity = response.hasEntity() ? response.readEntity(String.class) : "unknown error";
+ getLogger().error("Error sending metrics to Ambari due to {} - {}", new Object[]{response.getStatus(), responseEntity});
+ }
+
+ }
+
+ // calculate the current metrics, but store them to be sent next time
+ final ProcessGroupStatus status = context.getEventAccess().getControllerStatus();
final Map<String,String> statusMetrics = metricsService.getMetrics(status);
final Map<String,String> jvmMetrics = metricsService.getMetrics(virtualMachineMetrics);
@@ -132,20 +156,7 @@ public class AmbariReportingTask extends AbstractReportingTask {
.addAllMetrics(jvmMetrics)
.build();
- final WebTarget metricsTarget = client.target(metricsCollectorUrl);
- final Invocation.Builder invocation = metricsTarget.request();
-
- final Entity<String> entity = Entity.json(metricsObject.toString());
- getLogger().debug("Sending metrics {} to Ambari", new Object[]{entity.getEntity()});
-
- final Response response = invocation.post(entity);
- if (response.getStatus() == Response.Status.OK.getStatusCode()) {
- final long completedMillis = TimeUnit.NANOSECONDS.toMillis(System.currentTimeMillis() - start);
- getLogger().info("Successfully sent metrics to Ambari in {} ms", new Object[] {completedMillis});
- } else {
- final String responseEntity = response.hasEntity() ? response.readEntity(String.class) : "unknown error";
- getLogger().error("Error sending metrics to Ambari due to {} - {}", new Object[]{response.getStatus(), responseEntity});
- }
+ previousMetrics = metricsObject;
}
}