You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2020/01/06 20:59:07 UTC

[nifi] branch master updated: NIFI-6942 This closes #3934. Added a reporting task to push provenance data to azure log analytics.

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 65ca8e6  NIFI-6942 This closes #3934. Added a reporting task to push provenance data to azure log analytics.
65ca8e6 is described below

commit 65ca8e6c8defbc82bbac8c510cf16621a34418e9
Author: Ubuntu <ba...@ubuntu.bsz2odxhlxmeloyl4zwvdtk5oh.xx.internal.cloudapp.net>
AuthorDate: Fri Dec 13 20:12:28 2019 +0000

    NIFI-6942 This closes #3934. Added a reporting task to push provenance data to azure log analytics.
    
    Signed-off-by: Joe Witt <jo...@apache.org>
---
 .../nifi-azure-reporting-task/pom.xml              |   6 +
 .../AbstractAzureLogAnalyticsReportingTask.java    | 165 ++++++++
 .../AzureLogAnalyticsProvenanceReportingTask.java  | 470 +++++++++++++++++++++
 .../AzureLogAnalyticsReportingTask.java            | 233 +++-------
 .../org.apache.nifi.reporting.ReportingTask        |   3 +-
 ...stAzureLogAnalyticsProvenanceReportingTask.java |  96 +++++
 6 files changed, 788 insertions(+), 185 deletions(-)

diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/pom.xml
index 79eb37d..3e14afc 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/pom.xml
@@ -59,5 +59,11 @@
             <version>1.11.0-SNAPSHOT</version>
             <scope>test</scope>
         </dependency>
+    
+      <dependency>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-reporting-utils</artifactId>
+        <version>1.11.0-SNAPSHOT</version>
+      </dependency>
     </dependencies>
 </project>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AbstractAzureLogAnalyticsReportingTask.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AbstractAzureLogAnalyticsReportingTask.java
new file mode 100644
index 0000000..d48c765
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AbstractAzureLogAnalyticsReportingTask.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.azure.loganalytics;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.text.MessageFormat;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+import javax.xml.bind.DatatypeConverter;
+
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.AbstractReportingTask;
+
+/**
+ * Abstract ReportingTask to send metrics from Apache NiFi and JVM to Azure
+ * Monitor.
+ */
+public abstract class AbstractAzureLogAnalyticsReportingTask extends AbstractReportingTask {
+
+    private static final Charset UTF8 = Charset.forName("UTF-8");
+    private static final String HMAC_SHA256_ALG = "HmacSHA256";
+    private static final DateTimeFormatter RFC_1123_DATE_TIME = DateTimeFormatter
+            .ofPattern("EEE, dd MMM yyyy HH:mm:ss O");
+
+    static final PropertyDescriptor LOG_ANALYTICS_WORKSPACE_ID = new PropertyDescriptor.Builder()
+            .name("Log Analytics Workspace Id").description("Log Analytics Workspace Id").required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).sensitive(true).build();
+    static final PropertyDescriptor LOG_ANALYTICS_WORKSPACE_KEY = new PropertyDescriptor.Builder()
+            .name("Log Analytics Workspace Key").description("Azure Log Analytic Worskspace Key").required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).sensitive(true).build();
+    static final PropertyDescriptor APPLICATION_ID = new PropertyDescriptor.Builder().name("Application ID")
+            .description("The Application ID to be included in the metrics sent to Azure Log Analytics WS")
+            .required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).defaultValue("nifi")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+    static final PropertyDescriptor INSTANCE_ID = new PropertyDescriptor.Builder().name("Instance ID")
+            .description("Id of this NiFi instance to be included in the metrics sent to Azure Log Analytics WS")
+            .required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .defaultValue("${hostname(true)}").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+    static final PropertyDescriptor PROCESS_GROUP_IDS = new PropertyDescriptor.Builder().name("Process group ID(s)")
+            .description(
+                    "If specified, the reporting task will send metrics the configured ProcessGroup(s) only. Multiple IDs should be separated by a comma. If"
+                            + " none of the group-IDs could be found or no IDs are defined, the Root Process Group is used and global metrics are sent.")
+            .required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.createListValidator(true, true,
+                    StandardValidators.createRegexMatchingValidator(Pattern.compile("[0-9a-z-]+"))))
+            .build();
+    static final PropertyDescriptor JOB_NAME = new PropertyDescriptor.Builder().name("Job Name")
+            .description("The name of the exporting job").defaultValue("nifi_reporting_job")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+    static final PropertyDescriptor LOG_ANALYTICS_URL_ENDPOINT_FORMAT = new PropertyDescriptor.Builder()
+            .name("Log Analytics URL Endpoint Format").description("Log Analytics URL Endpoint Format").required(false)
+            .defaultValue("https://{0}.ods.opinsights.azure.com/api/logs?api-version=2016-04-01")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
+
+    protected String createAuthorization(String workspaceId, String key, int contentLength, String rfc1123Date) {
+        try {
+            String signature = String.format("POST\n%d\napplication/json\nx-ms-date:%s\n/api/logs", contentLength,
+                    rfc1123Date);
+            Mac mac = Mac.getInstance(HMAC_SHA256_ALG);
+            mac.init(new SecretKeySpec(DatatypeConverter.parseBase64Binary(key), HMAC_SHA256_ALG));
+            String hmac = DatatypeConverter.printBase64Binary(mac.doFinal(signature.getBytes(UTF8)));
+            return String.format("SharedKey %s:%s", workspaceId, hmac);
+        } catch (NoSuchAlgorithmException | InvalidKeyException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(LOG_ANALYTICS_WORKSPACE_ID);
+        properties.add(LOG_ANALYTICS_WORKSPACE_KEY);
+        properties.add(APPLICATION_ID);
+        properties.add(INSTANCE_ID);
+        properties.add(PROCESS_GROUP_IDS);
+        properties.add(JOB_NAME);
+        properties.add(LOG_ANALYTICS_URL_ENDPOINT_FORMAT);
+        return properties;
+    }
+
+    /**
+     * Construct HttpPost and return it
+     *
+     * @param urlFormat   URL format to Azure Log Analytics Endpoint
+     * @param workspaceId your azure log analytics workspace id
+     * @param logName     log table name where metrics will be pushed
+     * @return HttpsURLConnection to your azure log analytics workspace
+     * @throws IllegalArgumentException if dataCollectorEndpoint url is invalid
+     */
+    protected HttpPost getHttpPost(final String urlFormat, final String workspaceId, final String logName)
+            throws IllegalArgumentException {
+        String dataCollectorEndpoint = MessageFormat.format(urlFormat, workspaceId);
+        HttpPost post = new HttpPost(dataCollectorEndpoint);
+        post.addHeader("Content-Type", "application/json");
+        post.addHeader("Log-Type", logName);
+        return post;
+    }
+
+    protected void sendToLogAnalytics(final HttpPost request, final String workspaceId, final String linuxPrimaryKey,
+            final String rawJson) throws IllegalArgumentException, RuntimeException, IOException {
+        final int bodyLength = rawJson.getBytes(UTF8).length;
+        final String nowRfc1123 = RFC_1123_DATE_TIME.format(ZonedDateTime.now(ZoneOffset.UTC));
+        final String createAuthorization = createAuthorization(workspaceId, linuxPrimaryKey, bodyLength, nowRfc1123);
+        request.addHeader("Authorization", createAuthorization);
+        request.addHeader("x-ms-date", nowRfc1123);
+        request.setEntity(new StringEntity(rawJson));
+        request.setEntity(new StringEntity(rawJson));
+        try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+            postRequest(httpClient, request);
+        }
+    }
+
+    /**
+     * post request with httpClient and httpPost
+     *
+     * @param httpClient HttpClient
+     * @param request    HttpPost
+     * @throws IOException      if httpClient.execute fails
+     * @throws RuntimeException if post request status return other than 200
+     */
+    protected void postRequest(final CloseableHttpClient httpClient, final HttpPost request)
+            throws IOException, RuntimeException {
+
+        try (CloseableHttpResponse response = httpClient.execute(request)) {
+            if (response != null && response.getStatusLine().getStatusCode() != 200) {
+                throw new RuntimeException(response.getStatusLine().toString());
+            }
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AzureLogAnalyticsProvenanceReportingTask.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AzureLogAnalyticsProvenanceReportingTask.java
new file mode 100644
index 0000000..da8cfa9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AzureLogAnalyticsProvenanceReportingTask.java
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.azure.loganalytics;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.text.DateFormat;
+import java.text.MessageFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.UUID;
+
+import javax.json.Json;
+import javax.json.JsonArrayBuilder;
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+import javax.json.JsonObjectBuilder;
+import javax.json.JsonValue;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
+
+@Tags({ "azure", "provenace", "reporting", "log analytics" })
+@CapabilityDescription("Publishes Provenance events to to a Azure Log Analytics workspace.")
+public class AzureLogAnalyticsProvenanceReportingTask extends AbstractAzureLogAnalyticsReportingTask {
+
+        protected static final String LAST_EVENT_ID_KEY = "last_event_id";
+        protected static final String DESTINATION_URL_PATH = "/nifi";
+        protected static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+
+        static final PropertyDescriptor LOG_ANALYTICS_CUSTOM_LOG_NAME = new PropertyDescriptor.Builder()
+                        .name("Log Analytics Custom Log Name").description("Log Analytics Custom Log Name").required(false)
+                        .defaultValue("nifiprovenance").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
+
+        static final AllowableValue BEGINNING_OF_STREAM = new AllowableValue("beginning-of-stream",
+                        "Beginning of Stream",
+                        "Start reading provenance Events from the beginning of the stream (the oldest event first)");
+
+        static final AllowableValue END_OF_STREAM = new AllowableValue("end-of-stream", "End of Stream",
+                        "Start reading provenance Events from the end of the stream, ignoring old events");
+
+        static final PropertyDescriptor FILTER_EVENT_TYPE = new PropertyDescriptor.Builder()
+                        .name("s2s-prov-task-event-filter").displayName("Event Type to Include")
+                        .description("Comma-separated list of event types that will be used to filter the provenance events sent by the reporting task. "
+                                        + "Available event types are "
+                                        + Arrays.deepToString(ProvenanceEventType.values())
+                                        + ". If no filter is set, all the events are sent. If "
+                                        + "multiple filters are set, the filters are cumulative.")
+                        .required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+                        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+        static final PropertyDescriptor FILTER_EVENT_TYPE_EXCLUDE = new PropertyDescriptor.Builder()
+                        .name("s2s-prov-task-event-filter-exclude").displayName("Event Type to Exclude")
+                        .description("Comma-separated list of event types that will be used to exclude the provenance events sent by the reporting task. "
+                                        + "Available event types are "
+                                        + Arrays.deepToString(ProvenanceEventType.values())
+                                        + ". If no filter is set, all the events are sent. If "
+                                        + "multiple filters are set, the filters are cumulative. If an event type is included in Event Type to Include and excluded here, then the "
+                                        + "exclusion takes precedence and the event will not be sent.")
+                        .required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+                        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+        static final PropertyDescriptor FILTER_COMPONENT_TYPE = new PropertyDescriptor.Builder()
+                        .name("s2s-prov-task-type-filter").displayName("Component Type to Include")
+                        .description("Regular expression to filter the provenance events based on the component type. Only the events matching the regular "
+                                        + "expression will be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.")
+                        .required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+                        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
+
+        static final PropertyDescriptor FILTER_COMPONENT_TYPE_EXCLUDE = new PropertyDescriptor.Builder()
+                        .name("s2s-prov-task-type-filter-exclude").displayName("Component Type to Exclude")
+                        .description("Regular expression to exclude the provenance events based on the component type. The events matching the regular "
+                                        + "expression will not be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. "
+                                        + "If a component type is included in Component Type to Include and excluded here, then the exclusion takes precedence and the event will not be sent.")
+                        .required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+                        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
+
+        static final PropertyDescriptor FILTER_COMPONENT_ID = new PropertyDescriptor.Builder()
+                        .name("s2s-prov-task-id-filter").displayName("Component ID to Include")
+                        .description("Comma-separated list of component UUID that will be used to filter the provenance events sent by the reporting task. If no "
+                                        + "filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.")
+                        .required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+                        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+        static final PropertyDescriptor FILTER_COMPONENT_ID_EXCLUDE = new PropertyDescriptor.Builder()
+                        .name("s2s-prov-task-id-filter-exclude").displayName("Component ID to Exclude")
+                        .description("Comma-separated list of component UUID that will be used to exclude the provenance events sent by the reporting task. If no "
+                                        + "filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. If a component UUID is included in "
+                                        + "Component ID to Include and excluded here, then the exclusion takes precedence and the event will not be sent.")
+                        .required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+                        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+        static final PropertyDescriptor FILTER_COMPONENT_NAME = new PropertyDescriptor.Builder()
+                        .name("s2s-prov-task-name-filter").displayName("Component Name to Include")
+                        .description("Regular expression to filter the provenance events based on the component name. Only the events matching the regular "
+                                        + "expression will be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.")
+                        .required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+                        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
+
+        static final PropertyDescriptor FILTER_COMPONENT_NAME_EXCLUDE = new PropertyDescriptor.Builder()
+                        .name("s2s-prov-task-name-filter-exclude").displayName("Component Name to Exclude")
+                        .description("Regular expression to exclude the provenance events based on the component name. The events matching the regular "
+                                        + "expression will not be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. "
+                                        + "If a component name is included in Component Name to Include and excluded here, then the exclusion takes precedence and the event will not be sent.")
+                        .required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+                        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
+
+        static final PropertyDescriptor START_POSITION = new PropertyDescriptor.Builder().name("start-position")
+                        .displayName("Start Position")
+                        .description("If the Reporting Task has never been run, or if its state has been reset by a user, "
+                                        + "specifies where in the stream of Provenance Events the Reporting Task should start")
+                        .allowableValues(BEGINNING_OF_STREAM, END_OF_STREAM)
+                        .defaultValue(BEGINNING_OF_STREAM.getValue()).required(true).build();
+
+        static final PropertyDescriptor ALLOW_NULL_VALUES = new PropertyDescriptor.Builder().name("include-null-values")
+                        .displayName("Include Null Values")
+                        .description("Indicate if null values should be included in records. Default will be false")
+                        .required(true).allowableValues("true", "false").defaultValue("false").build();
+
+        static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder().name("Platform")
+                        .description("The value to use for the platform field in each event.").required(true)
+                        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).defaultValue("nifi")
+                        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+        static final PropertyDescriptor INSTANCE_URL = new PropertyDescriptor.Builder().name("Instance URL")
+                        .displayName("Instance URL")
+                        .description("The URL of this instance to use in the Content URI of each event.").required(true)
+                        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+                        .defaultValue("http://${hostname(true)}:8080/nifi")
+                        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+        static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("Batch Size")
+                        .displayName("Batch Size")
+                        .description("Specifies how many records to send in a single batch, at most.").required(true)
+                        .defaultValue("1000").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
+
+        private ConfigurationContext context;
+
+        private volatile ProvenanceEventConsumer consumer;
+
+        @Override
+        protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+                final List<PropertyDescriptor> properties = new ArrayList<>();
+                properties.add(LOG_ANALYTICS_WORKSPACE_ID);
+                properties.add(LOG_ANALYTICS_CUSTOM_LOG_NAME);
+                properties.add(LOG_ANALYTICS_WORKSPACE_KEY);
+                properties.add(APPLICATION_ID);
+                properties.add(INSTANCE_ID);
+                properties.add(JOB_NAME);
+                properties.add(LOG_ANALYTICS_URL_ENDPOINT_FORMAT);
+                properties.add(FILTER_EVENT_TYPE);
+                properties.add(FILTER_EVENT_TYPE_EXCLUDE);
+                properties.add(FILTER_COMPONENT_TYPE);
+                properties.add(FILTER_COMPONENT_TYPE_EXCLUDE);
+                properties.add(FILTER_COMPONENT_ID);
+                properties.add(FILTER_COMPONENT_ID_EXCLUDE);
+                properties.add(FILTER_COMPONENT_NAME);
+                properties.add(FILTER_COMPONENT_NAME_EXCLUDE);
+                properties.add(START_POSITION);
+                properties.add(ALLOW_NULL_VALUES);
+                properties.add(PLATFORM);
+                properties.add(INSTANCE_URL);
+                properties.add(BATCH_SIZE);
+                return properties;
+        }
+
+        public void CreateConsumer(final ReportingContext context) {
+                if (consumer != null)
+                        return;
+                consumer = new ProvenanceEventConsumer();
+                consumer.setStartPositionValue(context.getProperty(START_POSITION).getValue());
+                consumer.setBatchSize(context.getProperty(BATCH_SIZE).asInteger());
+                consumer.setLogger(getLogger());
+                // initialize component type filtering
+                consumer.setComponentTypeRegex(
+                                context.getProperty(FILTER_COMPONENT_TYPE).evaluateAttributeExpressions().getValue());
+                consumer.setComponentTypeRegexExclude(context.getProperty(FILTER_COMPONENT_TYPE_EXCLUDE)
+                                .evaluateAttributeExpressions().getValue());
+                consumer.setComponentNameRegex(
+                                context.getProperty(FILTER_COMPONENT_NAME).evaluateAttributeExpressions().getValue());
+                consumer.setComponentNameRegexExclude(context.getProperty(FILTER_COMPONENT_NAME_EXCLUDE)
+                                .evaluateAttributeExpressions().getValue());
+
+                final String[] targetEventTypes = StringUtils.stripAll(StringUtils.split(
+                                context.getProperty(FILTER_EVENT_TYPE).evaluateAttributeExpressions().getValue(), ','));
+                if (targetEventTypes != null) {
+                        for (final String type : targetEventTypes) {
+                                try {
+                                        consumer.addTargetEventType(ProvenanceEventType.valueOf(type));
+                                } catch (final Exception e) {
+                                        getLogger().warn(type
+                                                        + " is not a correct event type, removed from the filtering.");
+                                }
+                        }
+                }
+
+                final String[] targetEventTypesExclude = StringUtils
+                                .stripAll(StringUtils.split(context.getProperty(FILTER_EVENT_TYPE_EXCLUDE)
+                                                .evaluateAttributeExpressions().getValue(), ','));
+                if (targetEventTypesExclude != null) {
+                        for (final String type : targetEventTypesExclude) {
+                                try {
+                                        consumer.addTargetEventTypeExclude(ProvenanceEventType.valueOf(type));
+                                } catch (final Exception e) {
+                                        getLogger().warn(type
+                                                        + " is not a correct event type, removed from the exclude filtering.");
+                                }
+                        }
+                }
+
+                // initialize component ID filtering
+                final String[] targetComponentIds = StringUtils.stripAll(StringUtils.split(
+                                context.getProperty(FILTER_COMPONENT_ID).evaluateAttributeExpressions().getValue(),
+                                ','));
+                if (targetComponentIds != null) {
+                        consumer.addTargetComponentId(targetComponentIds);
+                }
+
+                final String[] targetComponentIdsExclude = StringUtils
+                                .stripAll(StringUtils.split(context.getProperty(FILTER_COMPONENT_ID_EXCLUDE)
+                                                .evaluateAttributeExpressions().getValue(), ','));
+                if (targetComponentIdsExclude != null) {
+                        consumer.addTargetComponentIdExclude(targetComponentIdsExclude);
+                }
+
+                consumer.setScheduled(true);
+        }
+
+        @Override
+        public void onTrigger(ReportingContext context) {
+                final boolean isClustered = context.isClustered();
+                final String nodeId = context.getClusterNodeIdentifier();
+                if (nodeId == null && isClustered) {
+                        getLogger().debug(
+                                        "This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. "
+                                                        + "Will wait for Node Identifier to be established.");
+                        return;
+                }
+
+                try {
+                        processProvenanceData(context);
+
+                } catch (final Exception e) {
+                        getLogger().error("Failed to publish metrics to Azure Log Analytics", e);
+                }
+        }
+
+        public void processProvenanceData(final ReportingContext context) throws IOException {
+                getLogger().debug("Starting to process provenance data");
+                final String workspaceId = context.getProperty(LOG_ANALYTICS_WORKSPACE_ID)
+                                .evaluateAttributeExpressions().getValue();
+                final String linuxPrimaryKey = context.getProperty(LOG_ANALYTICS_WORKSPACE_KEY)
+                                .evaluateAttributeExpressions().getValue();
+                final String logName = context.getProperty(LOG_ANALYTICS_CUSTOM_LOG_NAME).evaluateAttributeExpressions()
+                                .getValue();
+                final String urlEndpointFormat = context.getProperty(LOG_ANALYTICS_URL_ENDPOINT_FORMAT)
+                                .evaluateAttributeExpressions().getValue();
+                final Integer batchSize = context.getProperty(BATCH_SIZE).asInteger();
+                final String dataCollectorEndpoint = MessageFormat.format(urlEndpointFormat, workspaceId);
+                final ProcessGroupStatus procGroupStatus = context.getEventAccess().getControllerStatus();
+                final String nodeId = context.getClusterNodeIdentifier();
+                final String rootGroupName = procGroupStatus == null ? null : procGroupStatus.getName();
+                final String platform = context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue();
+                final Boolean allowNullValues = context.getProperty(ALLOW_NULL_VALUES).asBoolean();
+                final String nifiUrl = context.getProperty(INSTANCE_URL).evaluateAttributeExpressions().getValue();
+                URL url;
+                try {
+                        url = new URL(nifiUrl);
+                } catch (final MalformedURLException e1) {
+                        throw new AssertionError();
+                }
+
+                final String hostname = url.getHost();
+                final Map<String, Object> config = Collections.emptyMap();
+                final JsonBuilderFactory factory = Json.createBuilderFactory(config);
+                final JsonObjectBuilder builder = factory.createObjectBuilder();
+                final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
+                df.setTimeZone(TimeZone.getTimeZone("Z"));
+                CreateConsumer(context);
+                consumer.consumeEvents(context, (mapHolder, events) -> {
+                        StringBuilder stringBuilder = new StringBuilder();
+                        stringBuilder.append('[');
+                        for (final ProvenanceEventRecord event : events) {
+                                final String componentName = mapHolder.getComponentName(event.getComponentId());
+                                final String processGroupId = mapHolder.getProcessGroupId(event.getComponentId(),
+                                                event.getComponentType());
+                                final String processGroupName = mapHolder.getComponentName(processGroupId);
+                                final JsonObject jo = serialize(factory, builder, event, df, componentName,
+                                                processGroupId, processGroupName, hostname, url, rootGroupName,
+                                                platform, nodeId, allowNullValues);
+                                stringBuilder.append(jo.toString());
+                                stringBuilder.append(',');
+                        }
+                        if (stringBuilder.charAt(stringBuilder.length() - 1) == ',')
+                                stringBuilder.deleteCharAt(stringBuilder.length() - 1);
+                        stringBuilder.append(']');
+                        String str = stringBuilder.toString();
+                        if (!str.equals("[]")) {
+                                final HttpPost httpPost = new HttpPost(dataCollectorEndpoint);
+                                httpPost.addHeader("Content-Type", "application/json");
+                                httpPost.addHeader("Log-Type", logName);
+                                getLogger().debug("Sending " + batchSize + " events of length " + str.length() + " to azure log analytics " + logName);
+                                try {
+                                        sendToLogAnalytics(httpPost, workspaceId, linuxPrimaryKey, str);
+
+                                } catch (final Exception e) {
+                                        getLogger().error("Failed to publish provenance data to Azure Log Analytics", e);
+                                }
+                        }
+                });
+                getLogger().debug("Done processing provenance data");
+        }
+
+        private JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder,
+                        final ProvenanceEventRecord event, final DateFormat df, final String componentName,
+                        final String processGroupId, final String processGroupName, final String hostname,
+                        final URL nifiUrl, final String applicationName, final String platform,
+                        final String nodeIdentifier, Boolean allowNullValues) {
+                addField(builder, "eventId", UUID.randomUUID().toString(), allowNullValues);
+                addField(builder, "eventOrdinal", event.getEventId(), allowNullValues);
+                addField(builder, "eventType", event.getEventType().name(), allowNullValues);
+                addField(builder, "timestampMillis", event.getEventTime(), allowNullValues);
+                addField(builder, "timestamp", df.format(event.getEventTime()), allowNullValues);
+                addField(builder, "durationMillis", event.getEventDuration(), allowNullValues);
+                addField(builder, "lineageStart", event.getLineageStartDate(), allowNullValues);
+                addField(builder, "details", event.getDetails(), allowNullValues);
+                addField(builder, "componentId", event.getComponentId(), allowNullValues);
+                addField(builder, "componentType", event.getComponentType(), allowNullValues);
+                addField(builder, "componentName", componentName, allowNullValues);
+                addField(builder, "processGroupId", processGroupId, allowNullValues);
+                addField(builder, "processGroupName", processGroupName, allowNullValues);
+                addField(builder, "entityId", event.getFlowFileUuid(), allowNullValues);
+                addField(builder, "entityType", "org.apache.nifi.flowfile.FlowFile", allowNullValues);
+                addField(builder, "entitySize", event.getFileSize(), allowNullValues);
+                addField(builder, "previousEntitySize", event.getPreviousFileSize(), allowNullValues);
+                addField(builder, factory, "updatedAttributes", event.getUpdatedAttributes(), allowNullValues);
+                addField(builder, factory, "previousAttributes", event.getPreviousAttributes(), allowNullValues);
+
+                addField(builder, "actorHostname", hostname, allowNullValues);
+                if (nifiUrl != null) {
+                        // TO get URL Prefix, we just remove the /nifi from the end of the URL. We know
+                        // that the URL ends with
+                        // "/nifi" because the Property Validator enforces it
+                        final String urlString = nifiUrl.toString();
+                        final String urlPrefix = urlString.substring(0,
+                                        urlString.length() - DESTINATION_URL_PATH.length());
+
+                        final String contentUriBase = urlPrefix + "/nifi-api/provenance-events/" + event.getEventId()
+                                        + "/content/";
+                        final String nodeIdSuffix = nodeIdentifier == null ? "" : "?clusterNodeId=" + nodeIdentifier;
+                        addField(builder, "contentURI", contentUriBase + "output" + nodeIdSuffix, allowNullValues);
+                        addField(builder, "previousContentURI", contentUriBase + "input" + nodeIdSuffix,
+                                        allowNullValues);
+                }
+
+                addField(builder, factory, "parentIds", event.getParentUuids(), allowNullValues);
+                addField(builder, factory, "childIds", event.getChildUuids(), allowNullValues);
+                addField(builder, "transitUri", event.getTransitUri(), allowNullValues);
+                addField(builder, "remoteIdentifier", event.getSourceSystemFlowFileIdentifier(), allowNullValues);
+                addField(builder, "alternateIdentifier", event.getAlternateIdentifierUri(), allowNullValues);
+                addField(builder, "platform", platform, allowNullValues);
+                addField(builder, "application", applicationName, allowNullValues);
+                return builder.build();
+        }
+
+        @OnUnscheduled
+        public void onUnscheduled() {
+                if (consumer != null) {
+                        getLogger().debug("Disabling schedule to consume provenance data.");
+                        consumer.setScheduled(false);
+                }
+        }
+
+        public static void addField(final JsonObjectBuilder builder, final String key, final Object value,
+                        boolean allowNullValues) {
+                if (value != null) {
+                        if (value instanceof String) {
+                                builder.add(key, (String) value);
+                        } else if (value instanceof Integer) {
+                                builder.add(key, (Integer) value);
+                        } else if (value instanceof Boolean) {
+                                builder.add(key, (Boolean) value);
+                        } else if (value instanceof Long) {
+                                builder.add(key, (Long) value);
+                        } else {
+                                builder.add(key, value.toString());
+                        }
+                } else if (allowNullValues) {
+                        builder.add(key, JsonValue.NULL);
+                }
+        }
+
+        public static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key,
+                        final Map<String, String> values, Boolean allowNullValues) {
+                if (values != null) {
+                        final JsonObjectBuilder mapBuilder = factory.createObjectBuilder();
+                        for (final Map.Entry<String, String> entry : values.entrySet()) {
+
+                                if (entry.getKey() == null) {
+                                        continue;
+                                } else if (entry.getValue() == null) {
+                                        if (allowNullValues) {
+                                                mapBuilder.add(entry.getKey(), JsonValue.NULL);
+                                        }
+                                } else {
+                                        mapBuilder.add(entry.getKey(), entry.getValue());
+                                }
+                        }
+
+                        builder.add(key, mapBuilder);
+
+                } else if (allowNullValues) {
+                        builder.add(key, JsonValue.NULL);
+                }
+        }
+
+        public static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key,
+                        final Collection<String> values, Boolean allowNullValues) {
+                if (values != null) {
+                        builder.add(key, createJsonArray(factory, values));
+                } else if (allowNullValues) {
+                        builder.add(key, JsonValue.NULL);
+                }
+        }
+
+        private static JsonArrayBuilder createJsonArray(JsonBuilderFactory factory, final Collection<String> values) {
+                final JsonArrayBuilder builder = factory.createArrayBuilder();
+                for (final String value : values) {
+                        if (value != null) {
+                                builder.add(value);
+                        }
+                }
+                return builder;
+        }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AzureLogAnalyticsReportingTask.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AzureLogAnalyticsReportingTask.java
index 8d64d2f..f9d9a82 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AzureLogAnalyticsReportingTask.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AzureLogAnalyticsReportingTask.java
@@ -17,29 +17,13 @@
 package org.apache.nifi.reporting.azure.loganalytics;
 
 import java.io.IOException;
-import java.nio.charset.Charset;
-import java.security.InvalidKeyException;
-import java.security.NoSuchAlgorithmException;
-import java.text.MessageFormat;
-import java.time.ZoneOffset;
-import java.time.ZonedDateTime;
-import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.regex.Pattern;
-
-import javax.crypto.Mac;
-import javax.crypto.spec.SecretKeySpec;
-import javax.xml.bind.DatatypeConverter;
 
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 
-import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
 import org.apache.nifi.annotation.configuration.DefaultSchedule;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -51,112 +35,29 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
 import org.apache.nifi.metrics.jvm.JvmMetrics;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.reporting.AbstractReportingTask;
 import org.apache.nifi.reporting.ReportingContext;
 import org.apache.nifi.reporting.azure.loganalytics.api.AzureLogAnalyticsMetricsFactory;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 
-
 /**
  * ReportingTask to send metrics from Apache NiFi and JVM to Azure Monitor.
  */
-@Tags({"reporting", "loganalytics", "metrics"})
-@CapabilityDescription("Sends JVM-metrics as well as Apache NiFi-metrics to a Azure Log Analytics workspace." +
-        "Apache NiFi-metrics can be either configured global or on process-group level.")
+@Tags({ "azure", "metrics", "reporting", "log analytics" })
+@CapabilityDescription("Sends JVM-metrics as well as Apache NiFi-metrics to a Azure Log Analytics workspace."
+        + "Apache NiFi-metrics can be either configured global or on process-group level.")
 @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
-public class AzureLogAnalyticsReportingTask extends AbstractReportingTask {
+public class AzureLogAnalyticsReportingTask extends AbstractAzureLogAnalyticsReportingTask {
 
     private static final String JVM_JOB_NAME = "jvm_global";
-    private static final Charset UTF8 = Charset.forName("UTF-8");
-    private static final String HMAC_SHA256_ALG = "HmacSHA256";
-    private static final DateTimeFormatter RFC_1123_DATE_TIME  = DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss O");
     private final JvmMetrics virtualMachineMetrics = JmxJvmMetrics.getInstance();
 
-
-    static final PropertyDescriptor LOG_ANALYTICS_WORKSPACE_ID = new PropertyDescriptor.Builder()
-            .name("Log Analytics Workspace Id")
-            .description("Log Analytics Workspace Id")
-            .required(true)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .sensitive(true)
-            .build();
+    static final PropertyDescriptor SEND_JVM_METRICS = new PropertyDescriptor.Builder().name("Send JVM Metrics")
+            .description("Send JVM Metrics in addition to the NiFi-metrics").allowableValues("true", "false")
+            .defaultValue("false").required(true).build();
     static final PropertyDescriptor LOG_ANALYTICS_CUSTOM_LOG_NAME = new PropertyDescriptor.Builder()
-            .name("Log Analytics Custom Log Name")
-            .description("Log Analytics Custom Log Name")
-            .required(false)
-            .defaultValue("nifimetrics")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .build();
-    static final PropertyDescriptor LOG_ANALYTICS_WORKSPACE_KEY = new PropertyDescriptor.Builder()
-            .name("Log Analytics Workspace Key")
-            .description("Azure Log Analytic Worskspace Key")
-            .required(true)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .sensitive(true)
-            .build();
-    static final PropertyDescriptor APPLICATION_ID = new PropertyDescriptor.Builder()
-            .name("Application ID")
-            .description("The Application ID to be included in the metrics sent to Azure Log Analytics WS")
-            .required(true)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .defaultValue("nifi")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-    static final PropertyDescriptor INSTANCE_ID = new PropertyDescriptor.Builder()
-            .name("Instance ID")
-            .description("Id of this NiFi instance to be included in the metrics sent to Azure Log Analytics WS")
-            .required(true)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .defaultValue("${hostname(true)}")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-    static final PropertyDescriptor PROCESS_GROUP_IDS = new PropertyDescriptor.Builder()
-            .name("Process group ID(s)")
-            .description("If specified, the reporting task will send metrics the configured ProcessGroup(s) only. Multiple IDs should be separated by a comma. If"
-                    + " none of the group-IDs could be found or no IDs are defined, the Root Process Group is used and global metrics are sent.")
-            .required(false)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .addValidator(StandardValidators
-                    .createListValidator(true, true, StandardValidators.createRegexMatchingValidator(Pattern.compile("[0-9a-z-]+"))))
-            .build();
-    static final PropertyDescriptor JOB_NAME = new PropertyDescriptor.Builder()
-            .name("Job Name")
-            .description("The name of the exporting job")
-            .defaultValue("nifi_reporting_job")
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-    static final PropertyDescriptor SEND_JVM_METRICS = new PropertyDescriptor.Builder()
-            .name("Send JVM Metrics")
-            .description("Send JVM Metrics in addition to the NiFi-metrics")
-            .allowableValues("true", "false")
-            .defaultValue("false")
-            .required(true)
-            .build();
-    static final PropertyDescriptor LOG_ANALYTICS_URL_ENDPOINT_FORMAT = new PropertyDescriptor.Builder()
-            .name("Log Analytics URL Endpoint Format")
-            .description("Log Analytics URL Endpoint Format")
-            .required(false)
-            .defaultValue("https://{0}.ods.opinsights.azure.com/api/logs?api-version=2016-04-01")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .build();
-
-    private String createAuthorization(String workspaceId, String key, int contentLength, String rfc1123Date) {
-        try {
-            // Documentation: https://docs.microsoft.com/en-us/rest/api/loganalytics/create-request
-            String signature = String.format("POST\n%d\napplication/json\nx-ms-date:%s\n/api/logs", contentLength, rfc1123Date);
-            Mac mac = Mac.getInstance(HMAC_SHA256_ALG);
-            mac.init(new SecretKeySpec(DatatypeConverter.parseBase64Binary(key), HMAC_SHA256_ALG));
-            String hmac = DatatypeConverter.printBase64Binary(mac.doFinal(signature.getBytes(UTF8)));
-            return String.format("SharedKey %s:%s", workspaceId, hmac);
-        } catch (NoSuchAlgorithmException | InvalidKeyException e) {
-            throw new RuntimeException(e);
-        }
-    }
+            .name("Log Analytics Custom Log Name").description("Log Analytics Custom Log Name").required(false)
+            .defaultValue("nifimetrics").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -168,31 +69,34 @@ public class AzureLogAnalyticsReportingTask extends AbstractReportingTask {
         properties.add(INSTANCE_ID);
         properties.add(PROCESS_GROUP_IDS);
         properties.add(JOB_NAME);
-        properties.add(SEND_JVM_METRICS);
         properties.add(LOG_ANALYTICS_URL_ENDPOINT_FORMAT);
         return properties;
     }
 
     @Override
-    public void onTrigger(final ReportingContext context){
-        final String workspaceId = context.getProperty(LOG_ANALYTICS_WORKSPACE_ID).evaluateAttributeExpressions().getValue();
-        final String linuxPrimaryKey = context.getProperty(LOG_ANALYTICS_WORKSPACE_KEY).evaluateAttributeExpressions().getValue();
+    public void onTrigger(final ReportingContext context) {
+        final String workspaceId = context.getProperty(LOG_ANALYTICS_WORKSPACE_ID).evaluateAttributeExpressions()
+                .getValue();
+        final String linuxPrimaryKey = context.getProperty(LOG_ANALYTICS_WORKSPACE_KEY).evaluateAttributeExpressions()
+                .getValue();
         final boolean jvmMetricsCollected = context.getProperty(SEND_JVM_METRICS).asBoolean();
-
-        final String logName = context.getProperty(LOG_ANALYTICS_CUSTOM_LOG_NAME).evaluateAttributeExpressions().getValue();
+        final String logName = context.getProperty(LOG_ANALYTICS_CUSTOM_LOG_NAME).evaluateAttributeExpressions()
+                .getValue();
         final String instanceId = context.getProperty(INSTANCE_ID).evaluateAttributeExpressions().getValue();
         final String groupIds = context.getProperty(PROCESS_GROUP_IDS).evaluateAttributeExpressions().getValue();
-        final String urlEndpointFormat = context.getProperty(LOG_ANALYTICS_URL_ENDPOINT_FORMAT).evaluateAttributeExpressions().getValue();
+        final String urlEndpointFormat = context.getProperty(LOG_ANALYTICS_URL_ENDPOINT_FORMAT)
+                .evaluateAttributeExpressions().getValue();
+
         try {
             List<Metric> allMetrics = null;
-            if(groupIds == null || groupIds.isEmpty()) {
+            if (groupIds == null || groupIds.isEmpty()) {
                 ProcessGroupStatus status = context.getEventAccess().getControllerStatus();
                 String processGroupName = status.getName();
                 allMetrics = collectMetrics(instanceId, status, processGroupName, jvmMetricsCollected);
             } else {
                 allMetrics = new ArrayList<>();
-                for(String groupId: groupIds.split(",")) {
-                    groupId =groupId.trim();
+                for (String groupId : groupIds.split(",")) {
+                    groupId = groupId.trim();
                     ProcessGroupStatus status = context.getEventAccess().getGroupStatus(groupId);
                     String processGroupName = status.getName();
                     allMetrics.addAll(collectMetrics(instanceId, status, processGroupName, jvmMetricsCollected));
@@ -206,35 +110,21 @@ public class AzureLogAnalyticsReportingTask extends AbstractReportingTask {
     }
 
     /**
-     *  Construct HttpPost and return it
-     * @param urlFormat URL format to Azure Log Analytics Endpoint
-     * @param workspaceId your azure log analytics workspace id
-     * @param logName log table name where metrics will be pushed
-     * @return HttpsURLConnection to your azure log analytics workspace
-     * @throws IllegalArgumentException if dataCollectorEndpoint url is invalid
-     */
-    protected HttpPost getHttpPost(final String urlFormat, final String workspaceId, final String logName)
-        throws IllegalArgumentException {
-        String dataCollectorEndpoint =
-            MessageFormat.format(urlFormat, workspaceId);
-        HttpPost post = new HttpPost(dataCollectorEndpoint);
-        post.addHeader("Content-Type", "application/json");
-        post.addHeader("Log-Type", logName);
-        return post;
-    }
-    /**
      * send collected metrics to azure log analytics workspace
-     * @param request HttpPost to Azure Log Analytics Endpoint
-     * @param workspaceId your azure log analytics workspace id
+     *
+     * @param request         HttpPost to Azure Log Analytics Endpoint
+     * @param workspaceId     your azure log analytics workspace id
      * @param linuxPrimaryKey your azure log analytics workspace key
-     * @param allMetrics collected metrics to be sent
-     * @throws IOException when there is an error in https url connection or read/write to the onnection
-     * @throws IllegalArgumentException when there a exception in converting metrics to json string with Gson.toJson
-     * @throws RuntimeException when httpPost fails with none 200 status code
+     * @param allMetrics      collected metrics to be sent
+     * @throws IOException              when there is an error in https url
+     *                                  connection or read/write to the onnection
+     * @throws IllegalArgumentException when there a exception in converting metrics
+     *                                  to json string with Gson.toJson
+     * @throws RuntimeException         when httpPost fails with none 200 status
+     *                                  code
      */
-    protected void sendMetrics(final HttpPost request, final String workspaceId, final String linuxPrimaryKey, final List<Metric> allMetrics)
-            throws IOException, IllegalArgumentException, RuntimeException {
-
+    protected void sendMetrics(final HttpPost request, final String workspaceId, final String linuxPrimaryKey,
+            final List<Metric> allMetrics) throws IOException, IllegalArgumentException, RuntimeException {
         Gson gson = new GsonBuilder().create();
         StringBuilder builder = new StringBuilder();
         builder.append('[');
@@ -243,45 +133,20 @@ public class AzureLogAnalyticsReportingTask extends AbstractReportingTask {
             builder.append(',');
         }
         builder.append(']');
-
-        final String rawJson = builder.toString();
-        final int bodyLength = rawJson.getBytes(UTF8).length;
-        final String nowRfc1123 = RFC_1123_DATE_TIME.format(ZonedDateTime.now(ZoneOffset.UTC));
-        final String createAuthorization = createAuthorization(workspaceId, linuxPrimaryKey, bodyLength, nowRfc1123);
-        request.addHeader("Authorization", createAuthorization);
-        request.addHeader("x-ms-date", nowRfc1123);
-        request.setEntity(new StringEntity(rawJson));
-        try(CloseableHttpClient httpClient = HttpClients.createDefault()){
-            postRequest(httpClient, request);
-        }
+        sendToLogAnalytics(request, workspaceId, linuxPrimaryKey, builder.toString());
     }
 
     /**
-     * post request with httpClient and httpPost
-     * @param httpClient HttpClient
-     * @param request HttpPost
-     * @throws IOException if httpClient.execute fails
-     * @throws RuntimeException if post request status return other than 200
-     */
-    protected void postRequest(final CloseableHttpClient httpClient, final HttpPost request)
-        throws IOException, RuntimeException {
-
-        try (CloseableHttpResponse response = httpClient.execute(request)) {
-            if(response != null && response.getStatusLine().getStatusCode() != 200) {
-                throw new RuntimeException(response.getStatusLine().toString());
-            }
-        }
-    }
-    /**
      * collect metrics to be sent to azure log analytics workspace
-     * @param instanceId instance id
-     * @param status process group status
-     * @param processGroupName process group name
+     *
+     * @param instanceId          instance id
+     * @param status              process group status
+     * @param processGroupName    process group name
      * @param jvmMetricsCollected whether we want to collect jvm metrics or not
      * @return list of metrics collected
      */
-    private List<Metric> collectMetrics(final String instanceId,
-            final ProcessGroupStatus status, final String processGroupName, final boolean jvmMetricsCollected) {
+    protected List<Metric> collectMetrics(final String instanceId, final ProcessGroupStatus status,
+            final String processGroupName, final boolean jvmMetricsCollected) {
         List<Metric> allMetrics = new ArrayList<>();
 
         // dataflow process group level metrics
@@ -290,9 +155,9 @@ public class AzureLogAnalyticsReportingTask extends AbstractReportingTask {
         // connections process group level metrics
         final List<ConnectionStatus> connectionStatuses = new ArrayList<>();
         populateConnectionStatuses(status, connectionStatuses);
-        for (ConnectionStatus connectionStatus: connectionStatuses) {
-            allMetrics.addAll(
-                AzureLogAnalyticsMetricsFactory.getConnectionStatusMetrics(connectionStatus, instanceId, processGroupName));
+        for (ConnectionStatus connectionStatus : connectionStatuses) {
+            allMetrics.addAll(AzureLogAnalyticsMetricsFactory.getConnectionStatusMetrics(connectionStatus, instanceId,
+                    processGroupName));
         }
 
         // processor level metrics
@@ -300,13 +165,12 @@ public class AzureLogAnalyticsReportingTask extends AbstractReportingTask {
         populateProcessorStatuses(status, processorStatuses);
         for (final ProcessorStatus processorStatus : processorStatuses) {
             allMetrics.addAll(
-                AzureLogAnalyticsMetricsFactory.getProcessorMetrics(processorStatus, instanceId, processGroupName)
-            );
+                    AzureLogAnalyticsMetricsFactory.getProcessorMetrics(processorStatus, instanceId, processGroupName));
         }
 
         if (jvmMetricsCollected) {
             allMetrics.addAll(
-                AzureLogAnalyticsMetricsFactory.getJvmMetrics(virtualMachineMetrics, instanceId, JVM_JOB_NAME));
+                    AzureLogAnalyticsMetricsFactory.getJvmMetrics(virtualMachineMetrics, instanceId, JVM_JOB_NAME));
 
         }
         return allMetrics;
@@ -319,7 +183,8 @@ public class AzureLogAnalyticsReportingTask extends AbstractReportingTask {
         }
     }
 
-    private void populateConnectionStatuses(final ProcessGroupStatus groupStatus, final List<ConnectionStatus> statuses) {
+    private void populateConnectionStatuses(final ProcessGroupStatus groupStatus,
+            final List<ConnectionStatus> statuses) {
         statuses.addAll(groupStatus.getConnectionStatus());
         for (final ProcessGroupStatus childGroupStatus : groupStatus.getProcessGroupStatus()) {
             populateConnectionStatuses(childGroupStatus, statuses);
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
index bbb8692..cbb3dc1 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
@@ -12,4 +12,5 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-org.apache.nifi.reporting.azure.loganalytics.AzureLogAnalyticsReportingTask
\ No newline at end of file
+org.apache.nifi.reporting.azure.loganalytics.AzureLogAnalyticsReportingTask
+org.apache.nifi.reporting.azure.loganalytics.AzureLogAnalyticsProvenanceReportingTask
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/test/java/org/apache/nifi/reporting/azure/loganalytics/TestAzureLogAnalyticsProvenanceReportingTask.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/test/java/org/apache/nifi/reporting/azure/loganalytics/TestAzureLogAnalyticsProvenanceReportingTask.java
new file mode 100644
index 0000000..ee7f8b9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/test/java/org/apache/nifi/reporting/azure/loganalytics/TestAzureLogAnalyticsProvenanceReportingTask.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.azure.loganalytics;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.json.Json;
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObjectBuilder;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.junit.Test;
+
+public class TestAzureLogAnalyticsProvenanceReportingTask {
+
+    @Test
+    public void testAddField1() throws IOException, InterruptedException, InitializationException {
+
+        final Map<String, Object> config = Collections.emptyMap();
+        final JsonBuilderFactory factory = Json.createBuilderFactory(config);
+        final JsonObjectBuilder builder = factory.createObjectBuilder();
+        AzureLogAnalyticsProvenanceReportingTask.addField(builder, "TestKeyString", "StringValue", true);
+        AzureLogAnalyticsProvenanceReportingTask.addField(builder, "TestKeyInteger", 2674440, true);
+        AzureLogAnalyticsProvenanceReportingTask.addField(builder, "TestKeyLong", 1289904147324L, true);
+        AzureLogAnalyticsProvenanceReportingTask.addField(builder, "TestKeyBoolean", true, true);
+        AzureLogAnalyticsProvenanceReportingTask.addField(builder, "TestKeyNotSupportedObject", 1.25, true);
+        AzureLogAnalyticsProvenanceReportingTask.addField(builder, "TestKeyNull", null, true);
+        javax.json.JsonObject actualJson = builder.build();
+        String expectedjsonString = "{" +
+                                        "\"TestKeyString\": \"StringValue\"," +
+                                        "\"TestKeyInteger\": 2674440," +
+                                        "\"TestKeyLong\": 1289904147324," +
+                                        "\"TestKeyBoolean\": true," +
+                                        "\"TestKeyNotSupportedObject\": \"1.25\"," +
+                                        "\"TestKeyNull\": null" +
+                                    "}";
+        JsonObject expectedJson = new Gson().fromJson(expectedjsonString, JsonObject.class);
+        assertEquals(expectedJson.toString(), actualJson.toString());
+    }
+
+    @Test
+    public void testAddField2() throws IOException, InterruptedException, InitializationException {
+
+        final Map<String, Object> config = Collections.emptyMap();
+        final JsonBuilderFactory factory = Json.createBuilderFactory(config);
+        final JsonObjectBuilder builder = factory.createObjectBuilder();
+        Map<String, String> values = new HashMap<String, String>();
+        values.put("TestKeyString1", "StringValue1");
+        values.put("TestKeyString2", "StringValue2");
+        AzureLogAnalyticsProvenanceReportingTask.addField(builder, factory, "TestKeyString", values, true);
+        javax.json.JsonObject actualJson = builder.build();
+        String expectedjsonString = "{\"TestKeyString\":{\"TestKeyString2\":\"StringValue2\",\"TestKeyString1\":\"StringValue1\"}}";
+        JsonObject expectedJson = new Gson().fromJson(expectedjsonString, JsonObject.class);
+        assertEquals(expectedJson.toString(), actualJson.toString());
+    }
+
+    @Test
+    public void testAddField3() throws IOException, InterruptedException, InitializationException {
+
+        final Map<String, Object> config = Collections.emptyMap();
+        final JsonBuilderFactory factory = Json.createBuilderFactory(config);
+        final JsonObjectBuilder builder = factory.createObjectBuilder();
+        Collection<String> values = new ArrayList<String>();
+        values.add("TestValueString1");
+        values.add("TestValueString2");
+        AzureLogAnalyticsProvenanceReportingTask.addField(builder, factory, "TestKeyString", values, true);
+        javax.json.JsonObject actualJson = builder.build();
+        String expectedjsonString = "{\"TestKeyString\":[\"TestValueString1\",\"TestValueString2\"]}";
+        JsonObject expectedJson = new Gson().fromJson(expectedjsonString, JsonObject.class);
+        assertEquals(expectedJson.toString(), actualJson.toString());
+    }
+}
\ No newline at end of file