You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/04/11 19:01:17 UTC

[1/2] nifi git commit: NIFI-4809 - Implement a SiteToSiteMetricsReportingTask

Repository: nifi
Updated Branches:
  refs/heads/master ce0855e98 -> 6fbe1515e


http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java
new file mode 100644
index 0000000..20416e1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.reporting;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import javax.json.Json;
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+
+import org.apache.avro.Schema;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.reporting.util.metrics.MetricNames;
+import org.apache.nifi.reporting.util.metrics.MetricsService;
+import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder;
+
+import com.yammer.metrics.core.VirtualMachineMetrics;
+
+@Tags({"status", "metrics", "site", "site to site"})
+@CapabilityDescription("Publishes same metrics as the Ambari Reporting task using the Site To Site protocol.")
+public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingTask {
+
+    static final AllowableValue AMBARI_FORMAT = new AllowableValue("ambari-format", "Ambari Format", "Metrics will be formatted"
+            + " according to the Ambari Metrics API. See Additional Details in Usage documentation.");
+    static final AllowableValue RECORD_FORMAT = new AllowableValue("record-format", "Record Format", "Metrics will be formatted"
+            + " using the Record Writer property of this reporting task. See Additional Details in Usage documentation to"
+            + " have the description of the default schema.");
+
+    static final PropertyDescriptor APPLICATION_ID = new PropertyDescriptor.Builder()
+            .name("s2s-metrics-application-id")
+            .displayName("Application ID")
+            .description("The Application ID to be included in the metrics")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .defaultValue("nifi")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+            .name("s2s-metrics-hostname")
+            .displayName("Hostname")
+            .description("The Hostname of this NiFi instance to be included in the metrics")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .defaultValue("${hostname(true)}")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor FORMAT = new PropertyDescriptor.Builder()
+            .name("s2s-metrics-format")
+            .displayName("Output Format")
+            .description("The output format that will be used for the metrics. If " + RECORD_FORMAT.getDisplayName() + " is selected, "
+                    + "a Record Writer must be provided. If " + AMBARI_FORMAT.getDisplayName() + " is selected, the Record Writer property "
+                    + "should be empty.")
+            .required(true)
+            .allowableValues(AMBARI_FORMAT, RECORD_FORMAT)
+            .defaultValue(AMBARI_FORMAT.getValue())
+            .addValidator(Validator.VALID)
+            .build();
+
+    private final MetricsService metricsService = new MetricsService();
+
+    public SiteToSiteMetricsReportingTask() throws IOException {
+        final InputStream schema = getClass().getClassLoader().getResourceAsStream("schema-metrics.avsc");
+        recordSchema = AvroTypeUtil.createSchema(new Schema.Parser().parse(schema));
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
+        properties.add(HOSTNAME);
+        properties.add(APPLICATION_ID);
+        properties.add(FORMAT);
+        properties.add(RECORD_WRITER);
+        properties.remove(BATCH_SIZE);
+        return properties;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
+
+        final boolean isWriterSet = validationContext.getProperty(RECORD_WRITER).isSet();
+        if (validationContext.getProperty(FORMAT).getValue().equals(RECORD_FORMAT.getValue()) && !isWriterSet) {
+            problems.add(new ValidationResult.Builder()
+                    .input("Record Writer")
+                    .valid(false)
+                    .explanation("If using " + RECORD_FORMAT.getDisplayName() + ", a record writer needs to be set.")
+                    .build());
+        }
+        if (validationContext.getProperty(FORMAT).getValue().equals(AMBARI_FORMAT.getValue()) && isWriterSet) {
+            problems.add(new ValidationResult.Builder()
+                    .input("Record Writer")
+                    .valid(false)
+                    .explanation("If using " + AMBARI_FORMAT.getDisplayName() + ", no record writer should be set.")
+                    .build());
+        }
+
+        return problems;
+    }
+
+    @Override
+    public void onTrigger(final ReportingContext context) {
+        final boolean isClustered = context.isClustered();
+        final String nodeId = context.getClusterNodeIdentifier();
+        if (nodeId == null && isClustered) {
+            getLogger().debug("This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. "
+                    + "Will wait for Node Identifier to be established.");
+            return;
+        }
+
+        final VirtualMachineMetrics virtualMachineMetrics = VirtualMachineMetrics.getInstance();
+        final Map<String, ?> config = Collections.emptyMap();
+        final JsonBuilderFactory factory = Json.createBuilderFactory(config);
+
+        final String applicationId = context.getProperty(APPLICATION_ID).evaluateAttributeExpressions().getValue();
+        final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
+        final ProcessGroupStatus status = context.getEventAccess().getControllerStatus();
+
+        if(status != null) {
+            final Map<String,String> statusMetrics = metricsService.getMetrics(status, false);
+            final Map<String,String> jvmMetrics = metricsService.getMetrics(virtualMachineMetrics);
+
+            final MetricsBuilder metricsBuilder = new MetricsBuilder(factory);
+            final OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
+            final double systemLoad = os.getSystemLoadAverage();
+
+            byte[] data;
+            final Map<String, String> attributes = new HashMap<>();
+
+            if(context.getProperty(FORMAT).getValue().equals(AMBARI_FORMAT.getValue())) {
+                final JsonObject metricsObject = metricsBuilder
+                        .applicationId(applicationId)
+                        .instanceId(status.getId())
+                        .hostname(hostname)
+                        .timestamp(System.currentTimeMillis())
+                        .addAllMetrics(statusMetrics)
+                        .addAllMetrics(jvmMetrics)
+                        .metric(MetricNames.CORES, String.valueOf(os.getAvailableProcessors()))
+                        .metric(MetricNames.LOAD1MN, String.valueOf(systemLoad >= 0 ? systemLoad : -1))
+                        .build();
+
+                data = metricsObject.toString().getBytes(StandardCharsets.UTF_8);
+                attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+            } else {
+                final JsonObject metricsObject = metricsService.getMetrics(factory, status, virtualMachineMetrics, applicationId, status.getId(),
+                        hostname, System.currentTimeMillis(), os.getAvailableProcessors(), systemLoad >= 0 ? systemLoad : -1);
+                data = getData(context, new ByteArrayInputStream(metricsObject.toString().getBytes(StandardCharsets.UTF_8)), attributes);
+            }
+
+            try {
+                long start = System.nanoTime();
+                final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
+                if (transaction == null) {
+                    getLogger().debug("All destination nodes are penalized; will attempt to send data later");
+                    return;
+                }
+
+                final String transactionId = UUID.randomUUID().toString();
+                attributes.put("reporting.task.transaction.id", transactionId);
+                attributes.put("reporting.task.name", getName());
+                attributes.put("reporting.task.uuid", getIdentifier());
+                attributes.put("reporting.task.type", this.getClass().getSimpleName());
+
+                transaction.send(data, attributes);
+                transaction.confirm();
+                transaction.complete();
+
+                final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+                getLogger().info("Successfully sent metrics to destination in {}ms; Transaction ID = {}", new Object[]{transferMillis, transactionId});
+            } catch (final Exception e) {
+                throw new ProcessException("Failed to send metrics to destination due to:" + e.getMessage(), e);
+            }
+
+        } else {
+            getLogger().error("No process group status to retrieve metrics");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
index 6db30b8..ec1414d 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
@@ -46,7 +46,6 @@ import javax.json.JsonArrayBuilder;
 import javax.json.JsonBuilderFactory;
 import javax.json.JsonObject;
 import javax.json.JsonObjectBuilder;
-import javax.json.JsonValue;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -76,9 +75,6 @@ import java.util.concurrent.TimeUnit;
 )
 public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReportingTask {
 
-    static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
-    static final String LAST_EVENT_ID_KEY = "last_event_id";
-
     static final AllowableValue BEGINNING_OF_STREAM = new AllowableValue("beginning-of-stream", "Beginning of Stream",
         "Start reading provenance Events from the beginning of the stream (the oldest event first)");
     static final AllowableValue END_OF_STREAM = new AllowableValue("end-of-stream", "End of Stream",
@@ -307,7 +303,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
     }
 
 
-    static JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final ProvenanceEventRecord event, final DateFormat df,
+    private JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final ProvenanceEventRecord event, final DateFormat df,
                                 final String componentName, final String processGroupId, final String processGroupName, final String hostname, final URL nifiUrl, final String applicationName,
                                 final String platform, final String nodeIdentifier) {
         addField(builder, "eventId", UUID.randomUUID().toString());
@@ -371,13 +367,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
         builder.add(key, mapBuilder);
     }
 
-    private static void addField(final JsonObjectBuilder builder, final String key, final Long value) {
-        if (value != null) {
-            builder.add(key, value.longValue());
-        }
-    }
-
-    private static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Collection<String> values) {
+    private void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Collection<String> values) {
         if (values == null) {
             return;
         }
@@ -385,20 +375,6 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
         builder.add(key, createJsonArray(factory, values));
     }
 
-    private static void addField(final JsonObjectBuilder builder, final String key, final String value) {
-        addField(builder, key, value, false);
-    }
-
-    private static void addField(final JsonObjectBuilder builder, final String key, final String value, final boolean allowNullValues) {
-        if (value == null) {
-            if (allowNullValues) {
-                builder.add(key, JsonValue.NULL);
-            }
-        } else {
-            builder.add(key, value);
-        }
-    }
-
     private static JsonArrayBuilder createJsonArray(JsonBuilderFactory factory, final Collection<String> values) {
         final JsonArrayBuilder builder = factory.createArrayBuilder();
         for (final String value : values) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
index 75e9811..46b465d 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
@@ -61,8 +61,6 @@ import org.apache.nifi.remote.TransferDirection;
         + "However, all process groups are recursively searched for matching components, regardless of whether the process group matches the component filters.")
 public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTask {
 
-    static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
-
     static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
         .name("Platform")
         .description("The value to use for the platform field in each status record.")
@@ -71,6 +69,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
         .defaultValue("nifi")
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .build();
+
     static final PropertyDescriptor COMPONENT_TYPE_FILTER_REGEX = new PropertyDescriptor.Builder()
         .name("Component Type Filter Regex")
         .description("A regex specifying which component types to report.  Any component type matching this regex will be included.  "
@@ -80,6 +79,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
         .defaultValue("(Processor|ProcessGroup|RemoteProcessGroup|RootProcessGroup|Connection|InputPort|OutputPort)")
         .addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true))
         .build();
+
     static final PropertyDescriptor COMPONENT_NAME_FILTER_REGEX = new PropertyDescriptor.Builder()
         .name("Component Name Filter Regex")
         .description("A regex specifying which component names to report.  Any component name matching this regex will be included.")
@@ -198,7 +198,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
      *            The component name
      * @return Whether the component matches both filters
      */
-    boolean componentMatchesFilters(final String componentType, final String componentName) {
+    private boolean componentMatchesFilters(final String componentType, final String componentName) {
         return componentTypeFilter.matcher(componentType).matches()
                 && componentNameFilter.matcher(componentName).matches();
     }
@@ -222,7 +222,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
      * @param parentId
      *            The parent's component id
      */
-    void serializeProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory,
+    private void serializeProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory,
             final ProcessGroupStatus status, final DateFormat df,
         final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) {
         final JsonObjectBuilder builder = factory.createObjectBuilder();
@@ -279,7 +279,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
         }
     }
 
-    void serializeRemoteProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory,
+    private void serializeRemoteProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory,
             final RemoteProcessGroupStatus status, final DateFormat df, final String hostname, final String applicationName,
             final String platform, final String parentId, final Date currentDate) {
         final JsonObjectBuilder builder = factory.createObjectBuilder();
@@ -304,7 +304,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
         }
     }
 
-    void serializePortStatus(final String componentType, final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final PortStatus status,
+    private void serializePortStatus(final String componentType, final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final PortStatus status,
             final DateFormat df, final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) {
         final JsonObjectBuilder builder = factory.createObjectBuilder();
         final String componentName = status.getName();
@@ -328,7 +328,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
         }
     }
 
-    void serializeConnectionStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ConnectionStatus status, final DateFormat df,
+    private void serializeConnectionStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ConnectionStatus status, final DateFormat df,
             final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) {
         final JsonObjectBuilder builder = factory.createObjectBuilder();
         final String componentType = "Connection";
@@ -356,7 +356,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
         }
     }
 
-    void serializeProcessorStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ProcessorStatus status, final DateFormat df,
+    private void serializeProcessorStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ProcessorStatus status, final DateFormat df,
             final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) {
         final JsonObjectBuilder builder = factory.createObjectBuilder();
         final String componentType = "Processor";
@@ -387,7 +387,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
         }
     }
 
-    private static void addCommonFields(final JsonObjectBuilder builder, final DateFormat df, final String hostname,
+    private void addCommonFields(final JsonObjectBuilder builder, final DateFormat df, final String hostname,
             final String applicationName, final String platform, final String parentId, final Date currentDate,
             final String componentType, final String componentName) {
         addField(builder, "statusId", UUID.randomUUID().toString());
@@ -401,23 +401,4 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa
         addField(builder, "application", applicationName);
     }
 
-    private static void addField(final JsonObjectBuilder builder, final String key, final Long value) {
-        if (value != null) {
-            builder.add(key, value.longValue());
-        }
-    }
-
-    private static void addField(final JsonObjectBuilder builder, final String key, final Integer value) {
-        if (value != null) {
-            builder.add(key, value.intValue());
-        }
-    }
-
-    private static void addField(final JsonObjectBuilder builder, final String key, final String value) {
-        if (value == null) {
-            return;
-        }
-
-        builder.add(key, value);
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
index 0aced94..652b581 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
@@ -15,4 +15,5 @@
 
 org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask
 org.apache.nifi.reporting.SiteToSiteBulletinReportingTask
-org.apache.nifi.reporting.SiteToSiteStatusReportingTask
\ No newline at end of file
+org.apache.nifi.reporting.SiteToSiteStatusReportingTask
+org.apache.nifi.reporting.SiteToSiteMetricsReportingTask
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteMetricsReportingTask/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteMetricsReportingTask/additionalDetails.html b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteMetricsReportingTask/additionalDetails.html
new file mode 100644
index 0000000..8120d6a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteMetricsReportingTask/additionalDetails.html
@@ -0,0 +1,178 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>SiteToSiteMetricsReportingTask</title>
+
+        <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+    </head>
+
+    <body>
+    	<p>
+    		The Site-to-Site Metrics Reporting Task allows the user to publish NiFi's metrics (as in the Ambari reporting task) to the 
+    		same NiFi instance or another NiFi instance. This provides a great deal of power because it allows the user to make use of
+    		all of the different Processors that are available in NiFi in order to process or distribute that data.
+    	</p>
+    	
+    	<h2>Ambari format</h2>
+    	
+    	<p>
+    		There are two available output formats. The first one is the Ambari format as defined in the Ambari Metrics Collector 
+    		API which is a JSON with dynamic keys. If using this format you might be interested by the below Jolt specification to 
+    		transform the data.
+    	</p>
+    	
+		<pre>
+			<code>
+			[
+			  {
+			    "operation": "shift",
+			    "spec": {
+			      "metrics": {
+			        "*": {
+			          "metrics": {
+			            "*": {
+			              "$": "metrics.[#4].metrics.time",
+			              "@": "metrics.[#4].metrics.value"
+			            }
+			          },
+			          "*": "metrics.[&1].&"
+			        }
+			      }
+			    }
+			  }
+			]
+			</code>
+		</pre>
+		
+		<p>
+    		This would transform the below sample:
+    	</p>
+    	
+		<pre>
+			<code>
+			{
+				"metrics": [{
+					"metricname": "jvm.gc.time.G1OldGeneration",
+					"appid": "nifi",
+					"instanceid": "8927f4c0-0160-1000-597a-ea764ccd81a7",
+					"hostname": "localhost",
+					"timestamp": "1520456854361",
+					"starttime": "1520456854361",
+					"metrics": {
+						"1520456854361": "0"
+					}
+				}, {
+					"metricname": "jvm.thread_states.terminated",
+					"appid": "nifi",
+					"instanceid": "8927f4c0-0160-1000-597a-ea764ccd81a7",
+					"hostname": "localhost",
+					"timestamp": "1520456854361",
+					"starttime": "1520456854361",
+					"metrics": {
+						"1520456854361": "0"
+					}
+				}]
+			}
+			</code>
+		</pre>
+
+		<p>
+    		into:
+    	</p>
+    	
+		<pre>
+			<code>
+			{
+				"metrics": [{
+					"metricname": "jvm.gc.time.G1OldGeneration",
+					"appid": "nifi",
+					"instanceid": "8927f4c0-0160-1000-597a-ea764ccd81a7",
+					"hostname": "localhost",
+					"timestamp": "1520456854361",
+					"starttime": "1520456854361",
+					"metrics": {
+						"time": "1520456854361",
+						"value": "0"
+					}
+				}, {
+					"metricname": "jvm.thread_states.terminated",
+					"appid": "nifi",
+					"instanceid": "8927f4c0-0160-1000-597a-ea764ccd81a7",
+					"hostname": "localhost",
+					"timestamp": "1520456854361",
+					"starttime": "1520456854361",
+					"metrics": {
+						"time": "1520456854361",
+						"value": "0"
+					}
+				}]
+			}
+			</code>
+		</pre>
+    	
+    	<h2>Record format</h2>
+   	
+    	<p>
+    		The second format is leveraging the record framework of NiFi so that the user can define a Record Writer and directly 
+    		specify the output format and data with the assumption that the input schema is the following:
+    	</p>
+
+		<pre>
+			<code>
+			{
+			  "type" : "record",
+			  "name" : "metrics",
+			  "namespace" : "metrics",
+			  "fields" : [ 
+				{ "name" : "appid", "type" : "string" },
+				{ "name" : "instanceid", "type" : "string" },
+				{ "name" : "hostname", "type" : "string" },
+				{ "name" : "timestamp", "type" : "long" },
+				{ "name" : "loadAverage1min", "type" : "double" },
+				{ "name" : "availableCores", "type" : "int" },
+				{ "name" : "FlowFilesReceivedLast5Minutes", "type" : "int" },
+				{ "name" : "BytesReceivedLast5Minutes", "type" : "long" },
+				{ "name" : "FlowFilesSentLast5Minutes", "type" : "int" },
+				{ "name" : "BytesSentLast5Minutes", "type" : "long" },
+				{ "name" : "FlowFilesQueued", "type" : "int" },
+				{ "name" : "BytesQueued", "type" : "long" },
+				{ "name" : "BytesReadLast5Minutes", "type" : "long" },
+				{ "name" : "BytesWrittenLast5Minutes", "type" : "long" },
+				{ "name" : "ActiveThreads", "type" : "int" },
+				{ "name" : "TotalTaskDurationSeconds", "type" : "long" },
+				{ "name" : "TotalTaskDurationNanoSeconds", "type" : "long" },
+				{ "name" : "jvmuptime", "type" : "long" },
+				{ "name" : "jvmheap_used", "type" : "double" },
+				{ "name" : "jvmheap_usage", "type" : "double" },
+				{ "name" : "jvmnon_heap_usage", "type" : "double" },
+				{ "name" : "jvmthread_statesrunnable", "type" : ["int", "null"] },
+				{ "name" : "jvmthread_statesblocked", "type" : ["int", "null"] },
+				{ "name" : "jvmthread_statestimed_waiting", "type" : ["int", "null"] },
+				{ "name" : "jvmthread_statesterminated", "type" : ["int", "null"] },
+				{ "name" : "jvmthread_count", "type" : "int" },
+				{ "name" : "jvmdaemon_thread_count", "type" : "int" },
+				{ "name" : "jvmfile_descriptor_usage", "type" : "double" },
+				{ "name" : "jvmgcruns", "type" : ["long", "null"] },
+				{ "name" : "jvmgctime", "type" : ["long", "null"] }
+			  ]
+			}
+			</code>
+		</pre>
+
+	</body>
+</html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html
index e1841b2..86736a6 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html
@@ -25,7 +25,7 @@
     	<p>
     		The Site-to-Site Provenance Reporting Task allows the user to publish all of the Provenance Events from a NiFi instance back to
     		the same NiFi instance or another NiFi instance. This provides a great deal of power because it allows the user to make use of
-    		all of the different Processors that are available in NiFi in order to processor or distribute that data. When possible, it is
+    		all of the different Processors that are available in NiFi in order to process or distribute that data. When possible, it is
     		advisable to send the Provenance data to a different NiFi instance than the one that this Reporting Task is running on, because
     		when the data is received over Site-to-Site and processed, that in and of itself will generate Provenance events. As a result, there
     		is a cycle that is created. However, the data is sent in batches (1,000 by default). This means that for each batch of Provenance events

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-metrics.avsc
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-metrics.avsc b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-metrics.avsc
new file mode 100644
index 0000000..90dea10
--- /dev/null
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-metrics.avsc
@@ -0,0 +1,37 @@
+{
+  "type" : "record",
+  "name" : "metrics",
+  "namespace" : "metrics",
+  "fields" : [ 
+	{ "name" : "appid", "type" : "string" },
+	{ "name" : "instanceid", "type" : "string" },
+	{ "name" : "hostname", "type" : "string" },
+	{ "name" : "timestamp", "type" : "long" },
+	{ "name" : "loadAverage1min", "type" : "double" },
+	{ "name" : "availableCores", "type" : "int" },
+	{ "name" : "FlowFilesReceivedLast5Minutes", "type" : "int" },
+	{ "name" : "BytesReceivedLast5Minutes", "type" : "long" },
+	{ "name" : "FlowFilesSentLast5Minutes", "type" : "int" },
+	{ "name" : "BytesSentLast5Minutes", "type" : "long" },
+	{ "name" : "FlowFilesQueued", "type" : "int" },
+	{ "name" : "BytesQueued", "type" : "long" },
+	{ "name" : "BytesReadLast5Minutes", "type" : "long" },
+	{ "name" : "BytesWrittenLast5Minutes", "type" : "long" },
+	{ "name" : "ActiveThreads", "type" : "int" },
+	{ "name" : "TotalTaskDurationSeconds", "type" : "long" },
+	{ "name" : "TotalTaskDurationNanoSeconds", "type" : "long" },
+	{ "name" : "jvmuptime", "type" : "long" },
+	{ "name" : "jvmheap_used", "type" : "double" },
+	{ "name" : "jvmheap_usage", "type" : "double" },
+	{ "name" : "jvmnon_heap_usage", "type" : "double" },
+	{ "name" : "jvmthread_statesrunnable", "type" : ["int", "null"] },
+	{ "name" : "jvmthread_statesblocked", "type" : ["int", "null"] },
+	{ "name" : "jvmthread_statestimed_waiting", "type" : ["int", "null"] },
+	{ "name" : "jvmthread_statesterminated", "type" : ["int", "null"] },
+	{ "name" : "jvmthread_count", "type" : "int" },
+	{ "name" : "jvmdaemon_thread_count", "type" : "int" },
+	{ "name" : "jvmfile_descriptor_usage", "type" : "double" },
+	{ "name" : "jvmgcruns", "type" : ["long", "null"] },
+	{ "name" : "jvmgctime", "type" : ["long", "null"] }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java
new file mode 100644
index 0000000..c699a1c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.reporting;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import javax.json.Json;
+import javax.json.JsonArray;
+import javax.json.JsonObject;
+import javax.json.JsonReader;
+import javax.json.JsonValue;
+
+import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.state.MockStateManager;
+import org.apache.nifi.util.MockPropertyValue;
+import org.apache.nifi.util.TestRunner;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestSiteToSiteMetricsReportingTask {
+
+    private ReportingContext context;
+    private ProcessGroupStatus status;
+    private TestRunner runner;
+
+    @Before
+    public void setup() {
+        status = new ProcessGroupStatus();
+        status.setId("1234");
+        status.setFlowFilesReceived(5);
+        status.setBytesReceived(10000);
+        status.setFlowFilesSent(10);
+        status.setBytesSent(20000);
+        status.setQueuedCount(100);
+        status.setQueuedContentSize(1024L);
+        status.setBytesRead(60000L);
+        status.setBytesWritten(80000L);
+        status.setActiveThreadCount(5);
+
+        // create a processor status with processing time
+        ProcessorStatus procStatus = new ProcessorStatus();
+        procStatus.setProcessingNanos(123456789);
+
+        Collection<ProcessorStatus> processorStatuses = new ArrayList<>();
+        processorStatuses.add(procStatus);
+        status.setProcessorStatus(processorStatuses);
+
+        // create a group status with processing time
+        ProcessGroupStatus groupStatus = new ProcessGroupStatus();
+        groupStatus.setProcessorStatus(processorStatuses);
+
+        Collection<ProcessGroupStatus> groupStatuses = new ArrayList<>();
+        groupStatuses.add(groupStatus);
+        status.setProcessGroupStatus(groupStatuses);
+    }
+
+    public MockSiteToSiteMetricsReportingTask initTask(Map<PropertyDescriptor, String> customProperties) throws InitializationException, IOException {
+
+        final MockSiteToSiteMetricsReportingTask task = new MockSiteToSiteMetricsReportingTask();
+        Map<PropertyDescriptor, String> properties = new HashMap<>();
+        for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) {
+            properties.put(descriptor, descriptor.getDefaultValue());
+        }
+        properties.putAll(customProperties);
+
+        context = Mockito.mock(ReportingContext.class);
+        Mockito.when(context.getStateManager()).thenReturn(new MockStateManager(task));
+        Mockito.doAnswer(new Answer<PropertyValue>() {
+            @Override
+            public PropertyValue answer(final InvocationOnMock invocation) throws Throwable {
+                final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class);
+                return new MockPropertyValue(properties.get(descriptor));
+            }
+        }).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
+
+        final EventAccess eventAccess = Mockito.mock(EventAccess.class);
+        Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
+        Mockito.when(eventAccess.getControllerStatus()).thenReturn(status);
+
+        final PropertyValue pValue = Mockito.mock(StandardPropertyValue.class);
+        MockRecordWriter writer = new MockRecordWriter();
+        Mockito.when(context.getProperty(MockSiteToSiteMetricsReportingTask.RECORD_WRITER)).thenReturn(pValue);
+        Mockito.when(pValue.asControllerService(RecordSetWriterFactory.class)).thenReturn(writer);
+
+        final ComponentLog logger = Mockito.mock(ComponentLog.class);
+        final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class);
+        Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
+        Mockito.when(initContext.getLogger()).thenReturn(logger);
+        task.initialize(initContext);
+
+        return task;
+    }
+
+    @Test
+    public void testValidationBothAmbariFormatRecordWriter() throws IOException {
+        ValidationContext validationContext = Mockito.mock(ValidationContext.class);
+        final String urlEL = "http://${hostname(true)}:8080/nifi";
+        final String url = "http://localhost:8080/nifi";
+
+        final MockSiteToSiteMetricsReportingTask task = new MockSiteToSiteMetricsReportingTask();
+        Map<PropertyDescriptor, String> properties = new HashMap<>();
+        for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) {
+            properties.put(descriptor, descriptor.getDefaultValue());
+        }
+
+        properties.put(SiteToSiteMetricsReportingTask.FORMAT, SiteToSiteMetricsReportingTask.AMBARI_FORMAT.getValue());
+        properties.put(SiteToSiteMetricsReportingTask.DESTINATION_URL, url);
+        properties.put(SiteToSiteMetricsReportingTask.INSTANCE_URL, url);
+        properties.put(SiteToSiteMetricsReportingTask.PORT_NAME, "port");
+
+        final PropertyValue pValueUrl = Mockito.mock(StandardPropertyValue.class);
+        Mockito.when(validationContext.newPropertyValue(url)).thenReturn(pValueUrl);
+        Mockito.when(validationContext.newPropertyValue(urlEL)).thenReturn(pValueUrl);
+        Mockito.when(pValueUrl.evaluateAttributeExpressions()).thenReturn(pValueUrl);
+        Mockito.when(pValueUrl.getValue()).thenReturn(url);
+
+        Mockito.doAnswer(new Answer<PropertyValue>() {
+            @Override
+            public PropertyValue answer(final InvocationOnMock invocation) throws Throwable {
+                final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class);
+                return new MockPropertyValue(properties.get(descriptor));
+            }
+        }).when(validationContext).getProperty(Mockito.any(PropertyDescriptor.class));
+
+        final PropertyValue pValue = Mockito.mock(StandardPropertyValue.class);
+        Mockito.when(validationContext.getProperty(MockSiteToSiteMetricsReportingTask.RECORD_WRITER)).thenReturn(pValue);
+        Mockito.when(pValue.isSet()).thenReturn(true);
+
+        // should be invalid because both ambari format and record writer are set
+        Collection<ValidationResult> list = task.validate(validationContext);
+        Assert.assertEquals(1, list.size());
+        Assert.assertEquals(SiteToSiteMetricsReportingTask.RECORD_WRITER.getDisplayName(), list.iterator().next().getInput());
+    }
+
+    @Test
+    public void testValidationRecordFormatNoRecordWriter() throws IOException {
+        ValidationContext validationContext = Mockito.mock(ValidationContext.class);
+        final String urlEL = "http://${hostname(true)}:8080/nifi";
+        final String url = "http://localhost:8080/nifi";
+
+        final MockSiteToSiteMetricsReportingTask task = new MockSiteToSiteMetricsReportingTask();
+        Map<PropertyDescriptor, String> properties = new HashMap<>();
+        for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) {
+            properties.put(descriptor, descriptor.getDefaultValue());
+        }
+
+        properties.put(SiteToSiteMetricsReportingTask.FORMAT, SiteToSiteMetricsReportingTask.RECORD_FORMAT.getValue());
+        properties.put(SiteToSiteMetricsReportingTask.DESTINATION_URL, url);
+        properties.put(SiteToSiteMetricsReportingTask.INSTANCE_URL, url);
+        properties.put(SiteToSiteMetricsReportingTask.PORT_NAME, "port");
+
+        final PropertyValue pValueUrl = Mockito.mock(StandardPropertyValue.class);
+        Mockito.when(validationContext.newPropertyValue(url)).thenReturn(pValueUrl);
+        Mockito.when(validationContext.newPropertyValue(urlEL)).thenReturn(pValueUrl);
+        Mockito.when(pValueUrl.evaluateAttributeExpressions()).thenReturn(pValueUrl);
+        Mockito.when(pValueUrl.getValue()).thenReturn(url);
+
+        Mockito.doAnswer(new Answer<PropertyValue>() {
+            @Override
+            public PropertyValue answer(final InvocationOnMock invocation) throws Throwable {
+                final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class);
+                return new MockPropertyValue(properties.get(descriptor));
+            }
+        }).when(validationContext).getProperty(Mockito.any(PropertyDescriptor.class));
+
+        final PropertyValue pValue = Mockito.mock(StandardPropertyValue.class);
+        Mockito.when(validationContext.getProperty(MockSiteToSiteMetricsReportingTask.RECORD_WRITER)).thenReturn(pValue);
+        Mockito.when(pValue.isSet()).thenReturn(false);
+
+        // should be invalid because both ambari format and record writer are set
+        Collection<ValidationResult> list = task.validate(validationContext);
+        Assert.assertEquals(1, list.size());
+        Assert.assertEquals(SiteToSiteMetricsReportingTask.RECORD_WRITER.getDisplayName(), list.iterator().next().getInput());
+    }
+
+    @Test
+    public void testAmbariFormat() throws IOException, InitializationException {
+
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(SiteToSiteMetricsReportingTask.FORMAT, SiteToSiteMetricsReportingTask.AMBARI_FORMAT.getValue());
+
+        MockSiteToSiteMetricsReportingTask task = initTask(properties);
+        task.onTrigger(context);
+
+        assertEquals(1, task.dataSent.size());
+        final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
+        JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
+        JsonArray array = jsonReader.readObject().getJsonArray("metrics");
+        for(int i = 0; i < array.size(); i++) {
+            JsonObject object = array.getJsonObject(i);
+            assertEquals("nifi", object.getString("appid"));
+            assertEquals("1234", object.getString("instanceid"));
+            if(object.getString("metricname").equals("FlowFilesQueued")) {
+                for(Entry<String, JsonValue> kv : object.getJsonObject("metrics").entrySet()) {
+                    assertEquals("\"100\"", kv.getValue().toString());
+                }
+                return;
+            }
+        }
+        fail();
+    }
+
+    @Test
+    public void testRecordFormat() throws IOException, InitializationException {
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(SiteToSiteMetricsReportingTask.FORMAT, SiteToSiteMetricsReportingTask.RECORD_FORMAT.getValue());
+        properties.put(SiteToSiteMetricsReportingTask.RECORD_WRITER, "record-writer");
+        MockSiteToSiteMetricsReportingTask task = initTask(properties);
+
+        task.onTrigger(context);
+
+        assertEquals(1, task.dataSent.size());
+        String[] data = new String(task.dataSent.get(0)).split(",");
+        assertEquals("\"nifi\"", data[0]);
+        assertEquals("\"1234\"", data[1]);
+        assertEquals("\"100\"", data[10]); // FlowFilesQueued
+    }
+
+    private static final class MockSiteToSiteMetricsReportingTask extends SiteToSiteMetricsReportingTask {
+
+        public MockSiteToSiteMetricsReportingTask() throws IOException {
+            super();
+        }
+
+        final List<byte[]> dataSent = new ArrayList<>();
+
+        @Override
+        protected SiteToSiteClient getClient() {
+            final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class);
+            final Transaction transaction = Mockito.mock(Transaction.class);
+
+            try {
+                Mockito.doAnswer(new Answer<Object>() {
+                    @Override
+                    public Object answer(final InvocationOnMock invocation) throws Throwable {
+                        final byte[] data = invocation.getArgumentAt(0, byte[].class);
+                        dataSent.add(data);
+                        return null;
+                    }
+                }).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class));
+
+                Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
+            } catch (final Exception e) {
+                e.printStackTrace();
+                Assert.fail(e.toString());
+            }
+
+            return client;
+        }
+    }
+
+}


[2/2] nifi git commit: NIFI-4809 - Implement a SiteToSiteMetricsReportingTask

Posted by ma...@apache.org.
NIFI-4809 - Implement a SiteToSiteMetricsReportingTask

Fixed dependency issue by providing a local JSON reader

Rebased + fixed conflict + updated versions in pom + EL scope

Signed-off-by: Matthew Burgess <ma...@apache.org>

This closes #2575


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6fbe1515
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6fbe1515
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6fbe1515

Branch: refs/heads/master
Commit: 6fbe1515eefd2071dc75a1de2c1fc15cc282da76
Parents: ce0855e
Author: Pierre Villard <pi...@gmail.com>
Authored: Tue Jan 23 23:15:18 2018 +0100
Committer: Matthew Burgess <ma...@apache.org>
Committed: Wed Apr 11 14:44:30 2018 -0400

----------------------------------------------------------------------
 .../nifi-ambari-reporting-task/pom.xml          |  15 +-
 .../reporting/ambari/AmbariReportingTask.java   |   4 +-
 .../reporting/ambari/api/MetricBuilder.java     |  84 ----
 .../nifi/reporting/ambari/api/MetricFields.java |  29 --
 .../reporting/ambari/api/MetricsBuilder.java    |  93 ----
 .../reporting/ambari/metrics/MetricNames.java   |  55 ---
 .../ambari/metrics/MetricsService.java          | 131 ------
 .../ambari/api/TestMetricsBuilder.java          |   2 +
 .../ambari/metrics/TestMetricsService.java      |   2 +
 .../nifi-reporting-utils/pom.xml                |  10 +
 .../reporting/util/metrics/MetricNames.java     |  59 +++
 .../reporting/util/metrics/MetricsService.java  | 230 ++++++++++
 .../util/metrics/api/MetricBuilder.java         |  84 ++++
 .../util/metrics/api/MetricFields.java          |  29 ++
 .../util/metrics/api/MetricsBuilder.java        |  93 ++++
 .../nifi-site-to-site-reporting-task/pom.xml    |  39 +-
 .../AbstractSiteToSiteReportingTask.java        | 420 ++++++++++++++++++-
 .../SiteToSiteBulletinReportingTask.java        |  18 +-
 .../SiteToSiteMetricsReportingTask.java         | 222 ++++++++++
 .../SiteToSiteProvenanceReportingTask.java      |  28 +-
 .../SiteToSiteStatusReportingTask.java          |  37 +-
 .../org.apache.nifi.reporting.ReportingTask     |   3 +-
 .../additionalDetails.html                      | 178 ++++++++
 .../additionalDetails.html                      |   2 +-
 .../src/main/resources/schema-metrics.avsc      |  37 ++
 .../TestSiteToSiteMetricsReportingTask.java     | 296 +++++++++++++
 26 files changed, 1715 insertions(+), 485 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml
index dafe829..de024e2 100644
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml
@@ -30,21 +30,11 @@
             <artifactId>jersey-client</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.glassfish</groupId>
-            <artifactId>javax.json</artifactId>
-            <version>1.0.4</version>
-        </dependency>
-        <dependency>
             <groupId>javax.json</groupId>
             <artifactId>javax.json-api</artifactId>
             <version>1.0</version>
         </dependency>
         <dependency>
-            <groupId>com.yammer.metrics</groupId>
-            <artifactId>metrics-core</artifactId>
-            <version>2.2.0</version>
-        </dependency>
-        <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-api</artifactId>
         </dependency>
@@ -53,6 +43,11 @@
             <artifactId>nifi-utils</artifactId>
             <version>1.7.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-reporting-utils</artifactId>
+            <version>1.7.0-SNAPSHOT</version>
+        </dependency>
         <!-- test dependencies -->
         <dependency>
             <groupId>org.apache.nifi</groupId>

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
index 5bbdecb..0568b3e 100644
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
@@ -29,8 +29,8 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.reporting.AbstractReportingTask;
 import org.apache.nifi.reporting.ReportingContext;
-import org.apache.nifi.reporting.ambari.api.MetricsBuilder;
-import org.apache.nifi.reporting.ambari.metrics.MetricsService;
+import org.apache.nifi.reporting.util.metrics.MetricsService;
+import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 
 import javax.json.Json;

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java
deleted file mode 100644
index 8e234ce..0000000
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.reporting.ambari.api;
-
-import javax.json.JsonBuilderFactory;
-import javax.json.JsonObject;
-
-/**
- * Builds the JsonObject for an individual metric.
- */
-public class MetricBuilder {
-
-    private final JsonBuilderFactory factory;
-
-    private String applicationId;
-    private String instanceId;
-    private String hostname;
-    private String timestamp;
-    private String metricName;
-    private String metricValue;
-
-    public MetricBuilder(final JsonBuilderFactory factory) {
-        this.factory = factory;
-    }
-
-    public MetricBuilder applicationId(final String applicationId) {
-        this.applicationId = applicationId;
-        return this;
-    }
-
-    public MetricBuilder instanceId(final String instanceId) {
-        this.instanceId = instanceId;
-        return this;
-    }
-
-    public MetricBuilder hostname(final String hostname) {
-        this.hostname = hostname;
-        return this;
-    }
-
-    public MetricBuilder timestamp(final long timestamp) {
-        this.timestamp = String.valueOf(timestamp);
-        return this;
-    }
-
-    public MetricBuilder metricName(final String metricName) {
-        this.metricName = metricName;
-        return this;
-    }
-
-    public MetricBuilder metricValue(final String metricValue) {
-        this.metricValue = metricValue;
-        return this;
-    }
-
-    public JsonObject build() {
-        return factory.createObjectBuilder()
-                .add(MetricFields.METRIC_NAME, metricName)
-                .add(MetricFields.APP_ID, applicationId)
-                .add(MetricFields.INSTANCE_ID, instanceId)
-                .add(MetricFields.HOSTNAME, hostname)
-                .add(MetricFields.TIMESTAMP, timestamp)
-                .add(MetricFields.START_TIME, timestamp)
-                .add(MetricFields.METRICS,
-                        factory.createObjectBuilder()
-                                .add(String.valueOf(timestamp), metricValue)
-                ).build();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java
deleted file mode 100644
index 1c1629c..0000000
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.reporting.ambari.api;
-
-public interface MetricFields {
-
-    String METRIC_NAME = "metricname";
-    String APP_ID = "appid";
-    String INSTANCE_ID = "instanceid";
-    String HOSTNAME = "hostname";
-    String TIMESTAMP = "timestamp";
-    String START_TIME = "starttime";
-    String METRICS = "metrics";
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java
deleted file mode 100644
index 11b4db5..0000000
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.reporting.ambari.api;
-
-import javax.json.JsonArrayBuilder;
-import javax.json.JsonBuilderFactory;
-import javax.json.JsonObject;
-import javax.json.JsonObjectBuilder;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Builds the overall JsonObject for the Metrics.
- */
-public class MetricsBuilder {
-
-    static final String ROOT_JSON_ELEMENT = "metrics";
-
-    private final JsonBuilderFactory factory;
-
-    private long timestamp;
-    private String applicationId;
-    private String instanceId;
-    private String hostname;
-    private Map<String,String> metrics = new HashMap<>();
-
-    public MetricsBuilder(final JsonBuilderFactory factory) {
-        this.factory = factory;
-    }
-
-    public MetricsBuilder applicationId(final String applicationId) {
-        this.applicationId = applicationId;
-        return this;
-    }
-
-    public MetricsBuilder instanceId(final String instanceId) {
-        this.instanceId = instanceId;
-        return this;
-    }
-
-    public MetricsBuilder hostname(final String hostname) {
-        this.hostname = hostname;
-        return this;
-    }
-
-    public MetricsBuilder timestamp(final long timestamp) {
-        this.timestamp = timestamp;
-        return this;
-    }
-
-    public MetricsBuilder metric(final String name, String value) {
-        this.metrics.put(name, value);
-        return this;
-    }
-
-    public MetricsBuilder addAllMetrics(final Map<String,String> metrics) {
-        this.metrics.putAll(metrics);
-        return this;
-    }
-
-    public JsonObject build() {
-        // builds JsonObject for individual metrics
-        final MetricBuilder metricBuilder = new MetricBuilder(factory);
-        metricBuilder.instanceId(instanceId).applicationId(applicationId).timestamp(timestamp).hostname(hostname);
-
-        final JsonArrayBuilder metricArrayBuilder = factory.createArrayBuilder();
-
-        for (Map.Entry<String,String> entry : metrics.entrySet()) {
-            metricBuilder.metricName(entry.getKey()).metricValue(entry.getValue());
-            metricArrayBuilder.add(metricBuilder.build());
-        }
-
-        // add the array of metrics to a top-level json object
-        final JsonObjectBuilder metricsBuilder = factory.createObjectBuilder();
-        metricsBuilder.add(ROOT_JSON_ELEMENT, metricArrayBuilder);
-        return metricsBuilder.build();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java
deleted file mode 100644
index 20cfa4e..0000000
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.reporting.ambari.metrics;
-
-/**
- * The Metric names to send to Ambari.
- */
-public interface MetricNames {
-
-    // Metric Name separator
-    String METRIC_NAME_SEPARATOR = ".";
-
-    // NiFi Metrics
-    String FLOW_FILES_RECEIVED = "FlowFilesReceivedLast5Minutes";
-    String BYTES_RECEIVED = "BytesReceivedLast5Minutes";
-    String FLOW_FILES_SENT = "FlowFilesSentLast5Minutes";
-    String BYTES_SENT = "BytesSentLast5Minutes";
-    String FLOW_FILES_QUEUED = "FlowFilesQueued";
-    String BYTES_QUEUED = "BytesQueued";
-    String BYTES_READ = "BytesReadLast5Minutes";
-    String BYTES_WRITTEN = "BytesWrittenLast5Minutes";
-    String ACTIVE_THREADS = "ActiveThreads";
-    String TOTAL_TASK_DURATION_SECONDS = "TotalTaskDurationSeconds";
-    String TOTAL_TASK_DURATION_NANOS = "TotalTaskDurationNanoSeconds";
-
-    // JVM Metrics
-    String JVM_UPTIME = "jvm.uptime";
-    String JVM_HEAP_USED = "jvm.heap_used";
-    String JVM_HEAP_USAGE = "jvm.heap_usage";
-    String JVM_NON_HEAP_USAGE = "jvm.non_heap_usage";
-    String JVM_THREAD_STATES_RUNNABLE = "jvm.thread_states.runnable";
-    String JVM_THREAD_STATES_BLOCKED = "jvm.thread_states.blocked";
-    String JVM_THREAD_STATES_TIMED_WAITING = "jvm.thread_states.timed_waiting";
-    String JVM_THREAD_STATES_TERMINATED = "jvm.thread_states.terminated";
-    String JVM_THREAD_COUNT = "jvm.thread_count";
-    String JVM_DAEMON_THREAD_COUNT = "jvm.daemon_thread_count";
-    String JVM_FILE_DESCRIPTOR_USAGE = "jvm.file_descriptor_usage";
-    String JVM_GC_RUNS = "jvm.gc.runs";
-    String JVM_GC_TIME = "jvm.gc.time";
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java
deleted file mode 100644
index cef257d..0000000
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.reporting.ambari.metrics;
-
-import com.yammer.metrics.core.VirtualMachineMetrics;
-import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.controller.status.ProcessorStatus;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A service used to produce key/value metrics based on a given input.
- */
-public class MetricsService {
-
-    /**
-     * Generates a Map of metrics for a ProcessGroupStatus instance.
-     *
-     * @param status a ProcessGroupStatus to get metrics from
-     * @param appendPgId if true, the process group ID will be appended at the end of the metric name
-     * @return a map of metrics for the given status
-     */
-    public Map<String,String> getMetrics(ProcessGroupStatus status, boolean appendPgId) {
-        final Map<String,String> metrics = new HashMap<>();
-        metrics.put(appendPgId(MetricNames.FLOW_FILES_RECEIVED, status, appendPgId), String.valueOf(status.getFlowFilesReceived()));
-        metrics.put(appendPgId(MetricNames.BYTES_RECEIVED, status, appendPgId), String.valueOf(status.getBytesReceived()));
-        metrics.put(appendPgId(MetricNames.FLOW_FILES_SENT, status, appendPgId), String.valueOf(status.getFlowFilesSent()));
-        metrics.put(appendPgId(MetricNames.BYTES_SENT, status, appendPgId), String.valueOf(status.getBytesSent()));
-        metrics.put(appendPgId(MetricNames.FLOW_FILES_QUEUED, status, appendPgId), String.valueOf(status.getQueuedCount()));
-        metrics.put(appendPgId(MetricNames.BYTES_QUEUED, status, appendPgId), String.valueOf(status.getQueuedContentSize()));
-        metrics.put(appendPgId(MetricNames.BYTES_READ, status, appendPgId), String.valueOf(status.getBytesRead()));
-        metrics.put(appendPgId(MetricNames.BYTES_WRITTEN, status, appendPgId), String.valueOf(status.getBytesWritten()));
-        metrics.put(appendPgId(MetricNames.ACTIVE_THREADS, status, appendPgId), String.valueOf(status.getActiveThreadCount()));
-
-        final long durationNanos = calculateProcessingNanos(status);
-        metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_NANOS, status, appendPgId), String.valueOf(durationNanos));
-
-        final long durationSeconds = TimeUnit.SECONDS.convert(durationNanos, TimeUnit.NANOSECONDS);
-        metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_SECONDS, status, appendPgId), String.valueOf(durationSeconds));
-
-        return metrics;
-    }
-
-    /**
-     * Generates a Map of metrics for VirtualMachineMetrics.
-     *
-     * @param virtualMachineMetrics a VirtualMachineMetrics instance to get metrics from
-     * @return a map of metrics from the given VirtualMachineStatus
-     */
-    public Map<String,String> getMetrics(VirtualMachineMetrics virtualMachineMetrics) {
-        final Map<String,String> metrics = new HashMap<>();
-        metrics.put(MetricNames.JVM_UPTIME, String.valueOf(virtualMachineMetrics.uptime()));
-        metrics.put(MetricNames.JVM_HEAP_USED, String.valueOf(virtualMachineMetrics.heapUsed()));
-        metrics.put(MetricNames.JVM_HEAP_USAGE, String.valueOf(virtualMachineMetrics.heapUsage()));
-        metrics.put(MetricNames.JVM_NON_HEAP_USAGE, String.valueOf(virtualMachineMetrics.nonHeapUsage()));
-        metrics.put(MetricNames.JVM_THREAD_COUNT, String.valueOf(virtualMachineMetrics.threadCount()));
-        metrics.put(MetricNames.JVM_DAEMON_THREAD_COUNT, String.valueOf(virtualMachineMetrics.daemonThreadCount()));
-        metrics.put(MetricNames.JVM_FILE_DESCRIPTOR_USAGE, String.valueOf(virtualMachineMetrics.fileDescriptorUsage()));
-
-        for (Map.Entry<Thread.State,Double> entry : virtualMachineMetrics.threadStatePercentages().entrySet()) {
-            final int normalizedValue = (int) (100 * (entry.getValue() == null ? 0 : entry.getValue()));
-            switch(entry.getKey()) {
-                case BLOCKED:
-                    metrics.put(MetricNames.JVM_THREAD_STATES_BLOCKED, String.valueOf(normalizedValue));
-                    break;
-                case RUNNABLE:
-                    metrics.put(MetricNames.JVM_THREAD_STATES_RUNNABLE, String.valueOf(normalizedValue));
-                    break;
-                case TERMINATED:
-                    metrics.put(MetricNames.JVM_THREAD_STATES_TERMINATED, String.valueOf(normalizedValue));
-                    break;
-                case TIMED_WAITING:
-                    metrics.put(MetricNames.JVM_THREAD_STATES_TIMED_WAITING, String.valueOf(normalizedValue));
-                    break;
-                default:
-                    break;
-            }
-        }
-
-        for (Map.Entry<String,VirtualMachineMetrics.GarbageCollectorStats> entry : virtualMachineMetrics.garbageCollectors().entrySet()) {
-            final String gcName = entry.getKey().replace(" ", "");
-            final long runs = entry.getValue().getRuns();
-            final long timeMS = entry.getValue().getTime(TimeUnit.MILLISECONDS);
-            metrics.put(MetricNames.JVM_GC_RUNS + "." + gcName, String.valueOf(runs));
-            metrics.put(MetricNames.JVM_GC_TIME + "." + gcName, String.valueOf(timeMS));
-        }
-
-        return metrics;
-    }
-
-    // calculates the total processing time of all processors in nanos
-    protected long calculateProcessingNanos(final ProcessGroupStatus status) {
-        long nanos = 0L;
-
-        for (final ProcessorStatus procStats : status.getProcessorStatus()) {
-            nanos += procStats.getProcessingNanos();
-        }
-
-        for (final ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) {
-            nanos += calculateProcessingNanos(childGroupStatus);
-        }
-
-        return nanos;
-    }
-
-    // append the process group ID if necessary
-    private String appendPgId(String name, ProcessGroupStatus status, boolean appendPgId) {
-        if(appendPgId) {
-            return name + MetricNames.METRIC_NAME_SEPARATOR + status.getId();
-        } else {
-            return name;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java
index cdaa453..9b96eb9 100644
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.reporting.ambari.api;
 
+import org.apache.nifi.reporting.util.metrics.api.MetricFields;
+import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java
index 93224eb..ec0cf6e 100644
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java
@@ -19,6 +19,8 @@ package org.apache.nifi.reporting.ambari.metrics;
 import com.yammer.metrics.core.VirtualMachineMetrics;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.reporting.util.metrics.MetricNames;
+import org.apache.nifi.reporting.util.metrics.MetricsService;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml
index ba10afb..3e2d158 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml
@@ -40,6 +40,16 @@
             <artifactId>commons-lang3</artifactId>
             <version>3.7</version>
         </dependency>
+        <dependency>
+            <groupId>com.yammer.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+            <version>2.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish</groupId>
+            <artifactId>javax.json</artifactId>
+            <version>1.0.4</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java
new file mode 100644
index 0000000..19bb90d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.util.metrics;
+
+/**
+ * The Metric names to send to Ambari.
+ */
+public interface MetricNames {
+
+    // Metric Name separator
+    String METRIC_NAME_SEPARATOR = ".";
+
+    // NiFi Metrics
+    String FLOW_FILES_RECEIVED = "FlowFilesReceivedLast5Minutes";
+    String BYTES_RECEIVED = "BytesReceivedLast5Minutes";
+    String FLOW_FILES_SENT = "FlowFilesSentLast5Minutes";
+    String BYTES_SENT = "BytesSentLast5Minutes";
+    String FLOW_FILES_QUEUED = "FlowFilesQueued";
+    String BYTES_QUEUED = "BytesQueued";
+    String BYTES_READ = "BytesReadLast5Minutes";
+    String BYTES_WRITTEN = "BytesWrittenLast5Minutes";
+    String ACTIVE_THREADS = "ActiveThreads";
+    String TOTAL_TASK_DURATION_SECONDS = "TotalTaskDurationSeconds";
+    String TOTAL_TASK_DURATION_NANOS = "TotalTaskDurationNanoSeconds";
+
+    // JVM Metrics
+    String JVM_UPTIME = "jvm.uptime";
+    String JVM_HEAP_USED = "jvm.heap_used";
+    String JVM_HEAP_USAGE = "jvm.heap_usage";
+    String JVM_NON_HEAP_USAGE = "jvm.non_heap_usage";
+    String JVM_THREAD_STATES_RUNNABLE = "jvm.thread_states.runnable";
+    String JVM_THREAD_STATES_BLOCKED = "jvm.thread_states.blocked";
+    String JVM_THREAD_STATES_TIMED_WAITING = "jvm.thread_states.timed_waiting";
+    String JVM_THREAD_STATES_TERMINATED = "jvm.thread_states.terminated";
+    String JVM_THREAD_COUNT = "jvm.thread_count";
+    String JVM_DAEMON_THREAD_COUNT = "jvm.daemon_thread_count";
+    String JVM_FILE_DESCRIPTOR_USAGE = "jvm.file_descriptor_usage";
+    String JVM_GC_RUNS = "jvm.gc.runs";
+    String JVM_GC_TIME = "jvm.gc.time";
+
+    // OS Metrics
+    String LOAD1MN = "loadAverage1min";
+    String CORES = "availableCores";
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java
new file mode 100644
index 0000000..ed3922a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.util.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+import javax.json.JsonObjectBuilder;
+
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.reporting.util.metrics.api.MetricFields;
+
+import com.yammer.metrics.core.VirtualMachineMetrics;
+
+/**
+ * A service used to produce key/value metrics based on a given input.
+ */
+public class MetricsService {
+
+    /**
+     * Generates a Map of metrics for a ProcessGroupStatus instance.
+     *
+     * @param status a ProcessGroupStatus to get metrics from
+     * @param appendPgId if true, the process group ID will be appended at the end of the metric name
+     * @return a map of metrics for the given status
+     */
+    public Map<String,String> getMetrics(ProcessGroupStatus status, boolean appendPgId) {
+        final Map<String,String> metrics = new HashMap<>();
+
+        Map<String,Long> longMetrics = getLongMetrics(status, appendPgId);
+        for (String key : longMetrics.keySet()) {
+            metrics.put(key, String.valueOf(longMetrics.get(key)));
+        }
+
+        Map<String,Integer> integerMetrics = getIntegerMetrics(status, appendPgId);
+        for (String key : integerMetrics.keySet()) {
+            metrics.put(key, String.valueOf(integerMetrics.get(key)));
+        }
+
+        return metrics;
+    }
+
+    private Map<String,Integer> getIntegerMetrics(ProcessGroupStatus status, boolean appendPgId) {
+        final Map<String,Integer> metrics = new HashMap<>();
+        metrics.put(appendPgId(MetricNames.FLOW_FILES_RECEIVED, status, appendPgId), status.getFlowFilesReceived());
+        metrics.put(appendPgId(MetricNames.FLOW_FILES_SENT, status, appendPgId), status.getFlowFilesSent());
+        metrics.put(appendPgId(MetricNames.FLOW_FILES_QUEUED, status, appendPgId), status.getQueuedCount());
+        metrics.put(appendPgId(MetricNames.ACTIVE_THREADS, status, appendPgId), status.getActiveThreadCount());
+        return metrics;
+    }
+
+    private Map<String,Long> getLongMetrics(ProcessGroupStatus status, boolean appendPgId) {
+        final Map<String,Long> metrics = new HashMap<>();
+        metrics.put(appendPgId(MetricNames.BYTES_RECEIVED, status, appendPgId), status.getBytesReceived());
+        metrics.put(appendPgId(MetricNames.BYTES_SENT, status, appendPgId), status.getBytesSent());
+        metrics.put(appendPgId(MetricNames.BYTES_QUEUED, status, appendPgId), status.getQueuedContentSize());
+        metrics.put(appendPgId(MetricNames.BYTES_READ, status, appendPgId), status.getBytesRead());
+        metrics.put(appendPgId(MetricNames.BYTES_WRITTEN, status, appendPgId), status.getBytesWritten());
+
+        final long durationNanos = calculateProcessingNanos(status);
+        metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_NANOS, status, appendPgId), durationNanos);
+
+        final long durationSeconds = TimeUnit.SECONDS.convert(durationNanos, TimeUnit.NANOSECONDS);
+        metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_SECONDS, status, appendPgId), durationSeconds);
+
+        return metrics;
+    }
+
+    /**
+     * Generates a Map of metrics for VirtualMachineMetrics.
+     *
+     * @param virtualMachineMetrics a VirtualMachineMetrics instance to get metrics from
+     * @return a map of metrics from the given VirtualMachineStatus
+     */
+    public Map<String,String> getMetrics(VirtualMachineMetrics virtualMachineMetrics) {
+        final Map<String,String> metrics = new HashMap<>();
+
+        Map<String,Integer> integerMetrics = getIntegerMetrics(virtualMachineMetrics);
+        for (String key : integerMetrics.keySet()) {
+            metrics.put(key, String.valueOf(integerMetrics.get(key)));
+        }
+
+        Map<String,Long> longMetrics = getLongMetrics(virtualMachineMetrics);
+        for (String key : longMetrics.keySet()) {
+            metrics.put(key, String.valueOf(longMetrics.get(key)));
+        }
+
+        Map<String,Double> doubleMetrics = getDoubleMetrics(virtualMachineMetrics);
+        for (String key : doubleMetrics.keySet()) {
+            metrics.put(key, String.valueOf(doubleMetrics.get(key)));
+        }
+
+        return metrics;
+    }
+
+    // calculates the total processing time of all processors in nanos
+    protected long calculateProcessingNanos(final ProcessGroupStatus status) {
+        long nanos = 0L;
+
+        for (final ProcessorStatus procStats : status.getProcessorStatus()) {
+            nanos += procStats.getProcessingNanos();
+        }
+
+        for (final ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) {
+            nanos += calculateProcessingNanos(childGroupStatus);
+        }
+
+        return nanos;
+    }
+
+    // append the process group ID if necessary
+    private String appendPgId(String name, ProcessGroupStatus status, boolean appendPgId) {
+        if(appendPgId) {
+            return name + MetricNames.METRIC_NAME_SEPARATOR + status.getId();
+        } else {
+            return name;
+        }
+    }
+
+    private Map<String,Double> getDoubleMetrics(VirtualMachineMetrics virtualMachineMetrics) {
+        final Map<String,Double> metrics = new HashMap<>();
+        metrics.put(MetricNames.JVM_HEAP_USED, virtualMachineMetrics.heapUsed());
+        metrics.put(MetricNames.JVM_HEAP_USAGE, virtualMachineMetrics.heapUsage());
+        metrics.put(MetricNames.JVM_NON_HEAP_USAGE, virtualMachineMetrics.nonHeapUsage());
+        metrics.put(MetricNames.JVM_FILE_DESCRIPTOR_USAGE, virtualMachineMetrics.fileDescriptorUsage());
+        return metrics;
+    }
+
+    private Map<String,Long> getLongMetrics(VirtualMachineMetrics virtualMachineMetrics) {
+        final Map<String,Long> metrics = new HashMap<>();
+        metrics.put(MetricNames.JVM_UPTIME, virtualMachineMetrics.uptime());
+
+        for (Map.Entry<String,VirtualMachineMetrics.GarbageCollectorStats> entry : virtualMachineMetrics.garbageCollectors().entrySet()) {
+            final String gcName = entry.getKey().replace(" ", "");
+            final long runs = entry.getValue().getRuns();
+            final long timeMS = entry.getValue().getTime(TimeUnit.MILLISECONDS);
+            metrics.put(MetricNames.JVM_GC_RUNS + "." + gcName, runs);
+            metrics.put(MetricNames.JVM_GC_TIME + "." + gcName, timeMS);
+        }
+
+        return metrics;
+    }
+
+    private Map<String,Integer> getIntegerMetrics(VirtualMachineMetrics virtualMachineMetrics) {
+        final Map<String,Integer> metrics = new HashMap<>();
+        metrics.put(MetricNames.JVM_DAEMON_THREAD_COUNT, virtualMachineMetrics.daemonThreadCount());
+        metrics.put(MetricNames.JVM_THREAD_COUNT, virtualMachineMetrics.threadCount());
+
+        for (Map.Entry<Thread.State,Double> entry : virtualMachineMetrics.threadStatePercentages().entrySet()) {
+            final int normalizedValue = (int) (100 * (entry.getValue() == null ? 0 : entry.getValue()));
+            switch(entry.getKey()) {
+                case BLOCKED:
+                    metrics.put(MetricNames.JVM_THREAD_STATES_BLOCKED, normalizedValue);
+                    break;
+                case RUNNABLE:
+                    metrics.put(MetricNames.JVM_THREAD_STATES_RUNNABLE, normalizedValue);
+                    break;
+                case TERMINATED:
+                    metrics.put(MetricNames.JVM_THREAD_STATES_TERMINATED, normalizedValue);
+                    break;
+                case TIMED_WAITING:
+                    metrics.put(MetricNames.JVM_THREAD_STATES_TIMED_WAITING, normalizedValue);
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        return metrics;
+    }
+
+    public JsonObject getMetrics(JsonBuilderFactory factory, ProcessGroupStatus status, VirtualMachineMetrics virtualMachineMetrics,
+            String applicationId, String id, String hostname, long currentTimeMillis, int availableProcessors, double systemLoad) {
+        JsonObjectBuilder objectBuilder = factory.createObjectBuilder()
+                .add(MetricFields.APP_ID, applicationId)
+                .add(MetricFields.HOSTNAME, hostname)
+                .add(MetricFields.INSTANCE_ID, status.getId())
+                .add(MetricFields.TIMESTAMP, currentTimeMillis);
+
+        objectBuilder
+        .add(MetricNames.CORES, availableProcessors)
+        .add(MetricNames.LOAD1MN, systemLoad);
+
+        Map<String,Integer> integerMetrics = getIntegerMetrics(virtualMachineMetrics);
+        for (String key : integerMetrics.keySet()) {
+            objectBuilder.add(key.replaceAll("\\.", ""), integerMetrics.get(key));
+        }
+
+        Map<String,Long> longMetrics = getLongMetrics(virtualMachineMetrics);
+        for (String key : longMetrics.keySet()) {
+            objectBuilder.add(key.replaceAll("\\.", ""), longMetrics.get(key));
+        }
+
+        Map<String,Double> doubleMetrics = getDoubleMetrics(virtualMachineMetrics);
+        for (String key : doubleMetrics.keySet()) {
+            objectBuilder.add(key.replaceAll("\\.", ""), doubleMetrics.get(key));
+        }
+
+        Map<String,Long> longPgMetrics = getLongMetrics(status, false);
+        for (String key : longPgMetrics.keySet()) {
+            objectBuilder.add(key, longPgMetrics.get(key));
+        }
+
+        Map<String,Integer> integerPgMetrics = getIntegerMetrics(status, false);
+        for (String key : integerPgMetrics.keySet()) {
+            objectBuilder.add(key, integerPgMetrics.get(key));
+        }
+
+        return objectBuilder.build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java
new file mode 100644
index 0000000..81fb021
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.util.metrics.api;
+
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+
+/**
+ * Builds the JsonObject for an individual metric.
+ */
+public class MetricBuilder {
+
+    private final JsonBuilderFactory factory;
+
+    private String applicationId;
+    private String instanceId;
+    private String hostname;
+    private String timestamp;
+    private String metricName;
+    private String metricValue;
+
+    public MetricBuilder(final JsonBuilderFactory factory) {
+        this.factory = factory;
+    }
+
+    public MetricBuilder applicationId(final String applicationId) {
+        this.applicationId = applicationId;
+        return this;
+    }
+
+    public MetricBuilder instanceId(final String instanceId) {
+        this.instanceId = instanceId;
+        return this;
+    }
+
+    public MetricBuilder hostname(final String hostname) {
+        this.hostname = hostname;
+        return this;
+    }
+
+    public MetricBuilder timestamp(final long timestamp) {
+        this.timestamp = String.valueOf(timestamp);
+        return this;
+    }
+
+    public MetricBuilder metricName(final String metricName) {
+        this.metricName = metricName;
+        return this;
+    }
+
+    public MetricBuilder metricValue(final String metricValue) {
+        this.metricValue = metricValue;
+        return this;
+    }
+
+    public JsonObject build() {
+        return factory.createObjectBuilder()
+                .add(MetricFields.METRIC_NAME, metricName)
+                .add(MetricFields.APP_ID, applicationId)
+                .add(MetricFields.INSTANCE_ID, instanceId)
+                .add(MetricFields.HOSTNAME, hostname)
+                .add(MetricFields.TIMESTAMP, timestamp)
+                .add(MetricFields.START_TIME, timestamp)
+                .add(MetricFields.METRICS,
+                        factory.createObjectBuilder()
+                                .add(String.valueOf(timestamp), metricValue)
+                ).build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricFields.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricFields.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricFields.java
new file mode 100644
index 0000000..4c451ea
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricFields.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.util.metrics.api;
+
+public interface MetricFields {
+
+    String METRIC_NAME = "metricname";
+    String APP_ID = "appid";
+    String INSTANCE_ID = "instanceid";
+    String HOSTNAME = "hostname";
+    String TIMESTAMP = "timestamp";
+    String START_TIME = "starttime";
+    String METRICS = "metrics";
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java
new file mode 100644
index 0000000..3694720
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.util.metrics.api;
+
+import javax.json.JsonArrayBuilder;
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+import javax.json.JsonObjectBuilder;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Builds the overall JsonObject for the Metrics.
+ */
+public class MetricsBuilder {
+
+    static final String ROOT_JSON_ELEMENT = "metrics";
+
+    private final JsonBuilderFactory factory;
+
+    private long timestamp;
+    private String applicationId;
+    private String instanceId;
+    private String hostname;
+    private Map<String,String> metrics = new HashMap<>();
+
+    public MetricsBuilder(final JsonBuilderFactory factory) {
+        this.factory = factory;
+    }
+
+    public MetricsBuilder applicationId(final String applicationId) {
+        this.applicationId = applicationId;
+        return this;
+    }
+
+    public MetricsBuilder instanceId(final String instanceId) {
+        this.instanceId = instanceId;
+        return this;
+    }
+
+    public MetricsBuilder hostname(final String hostname) {
+        this.hostname = hostname;
+        return this;
+    }
+
+    public MetricsBuilder timestamp(final long timestamp) {
+        this.timestamp = timestamp;
+        return this;
+    }
+
+    public MetricsBuilder metric(final String name, String value) {
+        this.metrics.put(name, value);
+        return this;
+    }
+
+    public MetricsBuilder addAllMetrics(final Map<String,String> metrics) {
+        this.metrics.putAll(metrics);
+        return this;
+    }
+
+    public JsonObject build() {
+        // builds JsonObject for individual metrics
+        final MetricBuilder metricBuilder = new MetricBuilder(factory);
+        metricBuilder.instanceId(instanceId).applicationId(applicationId).timestamp(timestamp).hostname(hostname);
+
+        final JsonArrayBuilder metricArrayBuilder = factory.createArrayBuilder();
+
+        for (Map.Entry<String,String> entry : metrics.entrySet()) {
+            metricBuilder.metricName(entry.getKey()).metricValue(entry.getValue());
+            metricArrayBuilder.add(metricBuilder.build());
+        }
+
+        // add the array of metrics to a top-level json object
+        final JsonObjectBuilder metricsBuilder = factory.createObjectBuilder();
+        metricsBuilder.add(ROOT_JSON_ELEMENT, metricArrayBuilder);
+        return metricsBuilder.build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml
index c320ae2..93a3196 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml
@@ -55,6 +55,23 @@
             <version>1.7.0-SNAPSHOT</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-schema-registry-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-avro-record-utils</artifactId>
+            <version>1.7.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
             <groupId>org.glassfish</groupId>
             <artifactId>javax.json</artifactId>
             <version>1.0.4</version>
@@ -83,10 +100,30 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <version>1.7.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
-            <version>4.12</version>
             <scope>test</scope>
         </dependency>
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes combine.children="append">
+                        <exclude>src/main/resources/schema-metrics.avsc</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+    
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
index 341a6d8..e755354 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
@@ -16,6 +16,23 @@
  */
 package org.apache.nifi.reporting;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.DateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import javax.json.JsonObjectBuilder;
+import javax.json.JsonValue;
+import javax.net.ssl.SSLContext;
+
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -25,27 +42,51 @@ import org.apache.nifi.components.Validator;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.remote.client.SiteToSiteClient;
 import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 import org.apache.nifi.remote.protocol.http.HttpProxy;
 import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SerializedForm;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.apache.nifi.ssl.RestrictedSSLContextService;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.StringUtils;
-
-import javax.net.ssl.SSLContext;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonToken;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ArrayNode;
 
 /**
  * Base class for ReportingTasks that send data over site-to-site.
  */
 public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingTask {
+
+    protected static final String LAST_EVENT_ID_KEY = "last_event_id";
     protected static final String DESTINATION_URL_PATH = "/nifi";
+    protected static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
 
     static final PropertyDescriptor DESTINATION_URL = new PropertyDescriptor.Builder()
             .name("Destination URL")
@@ -141,8 +182,16 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
             .sensitive(true)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .build();
+    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the Controller Service to use for writing out the records.")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(false)
+            .build();
 
     protected volatile SiteToSiteClient siteToSiteClient;
+    protected volatile RecordSchema recordSchema;
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -188,7 +237,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
         final SiteToSiteTransportProtocol mode = SiteToSiteTransportProtocol.valueOf(context.getProperty(TRANSPORT_PROTOCOL).getValue());
         final HttpProxy httpProxy = mode.equals(SiteToSiteTransportProtocol.RAW) || StringUtils.isEmpty(context.getProperty(HTTP_PROXY_HOSTNAME).getValue()) ? null
                 : new HttpProxy(context.getProperty(HTTP_PROXY_HOSTNAME).getValue(), context.getProperty(HTTP_PROXY_PORT).asInteger(),
-                context.getProperty(HTTP_PROXY_USERNAME).getValue(), context.getProperty(HTTP_PROXY_PASSWORD).getValue());
+                        context.getProperty(HTTP_PROXY_USERNAME).getValue(), context.getProperty(HTTP_PROXY_PASSWORD).getValue());
 
         siteToSiteClient = new SiteToSiteClient.Builder()
                 .urls(SiteToSiteRestApiClient.parseClusterUrls(destinationUrl))
@@ -215,6 +264,33 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
         return this.siteToSiteClient;
     }
 
+    protected byte[] getData(final ReportingContext context, InputStream in, Map<String, String> attributes) {
+        try (final JsonRecordReader reader = new JsonRecordReader(in, recordSchema)) {
+
+            final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+            final RecordSchema writeSchema = writerFactory.getSchema(null, recordSchema);
+            final ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+            try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out)) {
+                writer.beginRecordSet();
+
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    writer.write(record);
+                }
+
+                final WriteResult writeResult = writer.finishRecordSet();
+
+                attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+                attributes.putAll(writeResult.getAttributes());
+            }
+
+            return out.toByteArray();
+        } catch (IOException | SchemaNotFoundException | MalformedRecordException e) {
+            throw new ProcessException("Failed to write metrics using record writer: " + e.getMessage(), e);
+        }
+    }
+
     static class NiFiUrlValidator implements Validator {
         @Override
         public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
@@ -236,4 +312,334 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
             }
         }
     }
+
+    protected void addField(final JsonObjectBuilder builder, final String key, final Long value) {
+        if (value != null) {
+            builder.add(key, value.longValue());
+        }
+    }
+
+    protected void addField(final JsonObjectBuilder builder, final String key, final Integer value) {
+        if (value != null) {
+            builder.add(key, value.intValue());
+        }
+    }
+
+    protected void addField(final JsonObjectBuilder builder, final String key, final String value) {
+        if (value == null) {
+            return;
+        }
+
+        builder.add(key, value);
+    }
+
+    protected void addField(final JsonObjectBuilder builder, final String key, final String value, final boolean allowNullValues) {
+        if (value == null) {
+            if (allowNullValues) {
+                builder.add(key, JsonValue.NULL);
+            }
+        } else {
+            builder.add(key, value);
+        }
+    }
+
+    private class JsonRecordReader implements RecordReader {
+
+        private RecordSchema recordSchema;
+        private final JsonParser jsonParser;
+        private final boolean array;
+        private final JsonNode firstJsonNode;
+        private boolean firstObjectConsumed = false;
+
+        private final Supplier<DateFormat> dateFormat = () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat());
+        private final Supplier<DateFormat> timeFormat = () -> DataTypeUtils.getDateFormat(RecordFieldType.TIME.getDefaultFormat());
+        private final Supplier<DateFormat> timestampFormat = () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat());
+
+        public JsonRecordReader(final InputStream in, RecordSchema recordSchema) throws IOException, MalformedRecordException {
+            this.recordSchema = recordSchema;
+            try {
+                jsonParser = new JsonFactory().createJsonParser(in);
+                jsonParser.setCodec(new ObjectMapper());
+                JsonToken token = jsonParser.nextToken();
+                if (token == JsonToken.START_ARRAY) {
+                    array = true;
+                    token = jsonParser.nextToken();
+                } else {
+                    array = false;
+                }
+                if (token == JsonToken.START_OBJECT) {
+                    firstJsonNode = jsonParser.readValueAsTree();
+                } else {
+                    firstJsonNode = null;
+                }
+            } catch (final JsonParseException e) {
+                throw new MalformedRecordException("Could not parse data as JSON", e);
+            }
+        }
+
+        @Override
+        public void close() throws IOException {
+            jsonParser.close();
+        }
+
+        @Override
+        public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException {
+            if (firstObjectConsumed && !array) {
+                return null;
+            }
+            try {
+                return convertJsonNodeToRecord(getNextJsonNode(), getSchema(), null, coerceTypes, dropUnknownFields);
+            } catch (final MalformedRecordException mre) {
+                throw mre;
+            } catch (final IOException ioe) {
+                throw ioe;
+            } catch (final Exception e) {
+                throw new MalformedRecordException("Failed to convert data into a Record object with the given schema", e);
+            }
+        }
+
+        @Override
+        public RecordSchema getSchema() throws MalformedRecordException {
+            return recordSchema;
+        }
+
+        private JsonNode getNextJsonNode() throws JsonParseException, IOException, MalformedRecordException {
+            if (!firstObjectConsumed) {
+                firstObjectConsumed = true;
+                return firstJsonNode;
+            }
+            while (true) {
+                final JsonToken token = jsonParser.nextToken();
+                if (token == null) {
+                    return null;
+                }
+                switch (token) {
+                    case END_OBJECT:
+                        continue;
+                    case START_OBJECT:
+                        return jsonParser.readValueAsTree();
+                    case END_ARRAY:
+                    case START_ARRAY:
+                        return null;
+                    default:
+                        throw new MalformedRecordException("Expected to get a JSON Object but got a token of type " + token.name());
+                }
+            }
+        }
+
+        private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final String fieldNamePrefix,
+                final boolean coerceTypes, final boolean dropUnknown) throws IOException, MalformedRecordException {
+
+            final Map<String, Object> values = new HashMap<>(schema.getFieldCount() * 2);
+
+            if (dropUnknown) {
+                for (final RecordField recordField : schema.getFields()) {
+                    final JsonNode childNode = getChildNode(jsonNode, recordField);
+                    if (childNode == null) {
+                        continue;
+                    }
+
+                    final String fieldName = recordField.getFieldName();
+                    final Object value;
+
+                    if (coerceTypes) {
+                        final DataType desiredType = recordField.getDataType();
+                        final String fullFieldName = fieldNamePrefix == null ? fieldName : fieldNamePrefix + fieldName;
+                        value = convertField(childNode, fullFieldName, desiredType, dropUnknown);
+                    } else {
+                        value = getRawNodeValue(childNode, recordField == null ? null : recordField.getDataType());
+                    }
+
+                    values.put(fieldName, value);
+                }
+            } else {
+                final Iterator<String> fieldNames = jsonNode.getFieldNames();
+                while (fieldNames.hasNext()) {
+                    final String fieldName = fieldNames.next();
+                    final JsonNode childNode = jsonNode.get(fieldName);
+                    final RecordField recordField = schema.getField(fieldName).orElse(null);
+                    final Object value;
+
+                    if (coerceTypes && recordField != null) {
+                        final DataType desiredType = recordField.getDataType();
+                        final String fullFieldName = fieldNamePrefix == null ? fieldName : fieldNamePrefix + fieldName;
+                        value = convertField(childNode, fullFieldName, desiredType, dropUnknown);
+                    } else {
+                        value = getRawNodeValue(childNode, recordField == null ? null : recordField.getDataType());
+                    }
+
+                    values.put(fieldName, value);
+                }
+            }
+
+            final Supplier<String> supplier = () -> jsonNode.toString();
+            return new MapRecord(schema, values, SerializedForm.of(supplier, "application/json"), false, dropUnknown);
+        }
+
+        private JsonNode getChildNode(final JsonNode jsonNode, final RecordField field) {
+            if (jsonNode.has(field.getFieldName())) {
+                return jsonNode.get(field.getFieldName());
+            }
+            for (final String alias : field.getAliases()) {
+                if (jsonNode.has(alias)) {
+                    return jsonNode.get(alias);
+                }
+            }
+            return null;
+        }
+
+        protected Object convertField(final JsonNode fieldNode, final String fieldName, final DataType desiredType, final boolean dropUnknown) throws IOException, MalformedRecordException {
+            if (fieldNode == null || fieldNode.isNull()) {
+                return null;
+            }
+
+            switch (desiredType.getFieldType()) {
+                case BOOLEAN:
+                case BYTE:
+                case CHAR:
+                case DOUBLE:
+                case FLOAT:
+                case INT:
+                case BIGINT:
+                case LONG:
+                case SHORT:
+                case STRING:
+                case DATE:
+                case TIME:
+                case TIMESTAMP: {
+                    final Object rawValue = getRawNodeValue(fieldNode, null);
+                    final Object converted = DataTypeUtils.convertType(rawValue, desiredType, dateFormat, timeFormat, timestampFormat, fieldName);
+                    return converted;
+                }
+                case MAP: {
+                    final DataType valueType = ((MapDataType) desiredType).getValueType();
+
+                    final Map<String, Object> map = new HashMap<>();
+                    final Iterator<String> fieldNameItr = fieldNode.getFieldNames();
+                    while (fieldNameItr.hasNext()) {
+                        final String childName = fieldNameItr.next();
+                        final JsonNode childNode = fieldNode.get(childName);
+                        final Object childValue = convertField(childNode, fieldName, valueType, dropUnknown);
+                        map.put(childName, childValue);
+                    }
+
+                    return map;
+                }
+                case ARRAY: {
+                    final ArrayNode arrayNode = (ArrayNode) fieldNode;
+                    final int numElements = arrayNode.size();
+                    final Object[] arrayElements = new Object[numElements];
+                    int count = 0;
+                    for (final JsonNode node : arrayNode) {
+                        final DataType elementType = ((ArrayDataType) desiredType).getElementType();
+                        final Object converted = convertField(node, fieldName, elementType, dropUnknown);
+                        arrayElements[count++] = converted;
+                    }
+
+                    return arrayElements;
+                }
+                case RECORD: {
+                    if (fieldNode.isObject()) {
+                        RecordSchema childSchema;
+                        if (desiredType instanceof RecordDataType) {
+                            childSchema = ((RecordDataType) desiredType).getChildSchema();
+                        } else {
+                            return null;
+                        }
+
+                        if (childSchema == null) {
+                            final List<RecordField> fields = new ArrayList<>();
+                            final Iterator<String> fieldNameItr = fieldNode.getFieldNames();
+                            while (fieldNameItr.hasNext()) {
+                                fields.add(new RecordField(fieldNameItr.next(), RecordFieldType.STRING.getDataType()));
+                            }
+
+                            childSchema = new SimpleRecordSchema(fields);
+                        }
+
+                        return convertJsonNodeToRecord(fieldNode, childSchema, fieldName + ".", true, dropUnknown);
+                    } else {
+                        return null;
+                    }
+                }
+                case CHOICE: {
+                    return DataTypeUtils.convertType(getRawNodeValue(fieldNode, null), desiredType, fieldName);
+                }
+            }
+
+            return null;
+        }
+
+        protected Object getRawNodeValue(final JsonNode fieldNode, final DataType dataType) throws IOException {
+            if (fieldNode == null || fieldNode.isNull()) {
+                return null;
+            }
+
+            if (fieldNode.isNumber()) {
+                return fieldNode.getNumberValue();
+            }
+
+            if (fieldNode.isBinary()) {
+                return fieldNode.getBinaryValue();
+            }
+
+            if (fieldNode.isBoolean()) {
+                return fieldNode.getBooleanValue();
+            }
+
+            if (fieldNode.isTextual()) {
+                return fieldNode.getTextValue();
+            }
+
+            if (fieldNode.isArray()) {
+                final ArrayNode arrayNode = (ArrayNode) fieldNode;
+                final int numElements = arrayNode.size();
+                final Object[] arrayElements = new Object[numElements];
+                int count = 0;
+
+                final DataType elementDataType;
+                if (dataType != null && dataType.getFieldType() == RecordFieldType.ARRAY) {
+                    final ArrayDataType arrayDataType = (ArrayDataType) dataType;
+                    elementDataType = arrayDataType.getElementType();
+                } else {
+                    elementDataType = null;
+                }
+
+                for (final JsonNode node : arrayNode) {
+                    final Object value = getRawNodeValue(node, elementDataType);
+                    arrayElements[count++] = value;
+                }
+
+                return arrayElements;
+            }
+
+            if (fieldNode.isObject()) {
+                RecordSchema childSchema;
+                if (dataType != null && RecordFieldType.RECORD == dataType.getFieldType()) {
+                    final RecordDataType recordDataType = (RecordDataType) dataType;
+                    childSchema = recordDataType.getChildSchema();
+                } else {
+                    childSchema = null;
+                }
+
+                if (childSchema == null) {
+                    childSchema = new SimpleRecordSchema(Collections.emptyList());
+                }
+
+                final Iterator<String> fieldNames = fieldNode.getFieldNames();
+                final Map<String, Object> childValues = new HashMap<>();
+                while (fieldNames.hasNext()) {
+                    final String childFieldName = fieldNames.next();
+                    final Object childValue = getRawNodeValue(fieldNode.get(childFieldName), dataType);
+                    childValues.put(childFieldName, childValue);
+                }
+
+                final MapRecord record = new MapRecord(childSchema, childValues);
+                return record;
+            }
+
+            return null;
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
index fac7696..20ed96a 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
@@ -68,9 +68,6 @@ import java.util.concurrent.TimeUnit;
 @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
 public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReportingTask {
 
-    static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
-    static final String LAST_EVENT_ID_KEY = "last_event_id";
-
     static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
         .name("Platform")
         .description("The value to use for the platform field in each provenance event.")
@@ -195,7 +192,7 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting
         lastSentBulletinId = currMaxId;
     }
 
-    static JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final Bulletin bulletin, final DateFormat df,
+    private JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final Bulletin bulletin, final DateFormat df,
         final String platform, final String nodeIdentifier) {
 
         addField(builder, "objectId", UUID.randomUUID().toString());
@@ -216,17 +213,4 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting
         return builder.build();
     }
 
-    private static void addField(final JsonObjectBuilder builder, final String key, final Long value) {
-        if (value != null) {
-            builder.add(key, value.longValue());
-        }
-    }
-
-    private static void addField(final JsonObjectBuilder builder, final String key, final String value) {
-        if (value == null) {
-            return;
-        }
-        builder.add(key, value);
-    }
-
 }