You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2020/01/06 20:59:07 UTC
[nifi] branch master updated: NIFI-6942 This closes #3934. Added a
reporting task to push provenance data to azure log analytics.
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 65ca8e6 NIFI-6942 This closes #3934. Added a reporting task to push provenance data to azure log analytics.
65ca8e6 is described below
commit 65ca8e6c8defbc82bbac8c510cf16621a34418e9
Author: Ubuntu <ba...@ubuntu.bsz2odxhlxmeloyl4zwvdtk5oh.xx.internal.cloudapp.net>
AuthorDate: Fri Dec 13 20:12:28 2019 +0000
NIFI-6942 This closes #3934. Added a reporting task to push provenance data to azure log analytics.
Signed-off-by: Joe Witt <jo...@apache.org>
---
.../nifi-azure-reporting-task/pom.xml | 6 +
.../AbstractAzureLogAnalyticsReportingTask.java | 165 ++++++++
.../AzureLogAnalyticsProvenanceReportingTask.java | 470 +++++++++++++++++++++
.../AzureLogAnalyticsReportingTask.java | 233 +++-------
.../org.apache.nifi.reporting.ReportingTask | 3 +-
...stAzureLogAnalyticsProvenanceReportingTask.java | 96 +++++
6 files changed, 788 insertions(+), 185 deletions(-)
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/pom.xml
index 79eb37d..3e14afc 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/pom.xml
@@ -59,5 +59,11 @@
<version>1.11.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-reporting-utils</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </dependency>
</dependencies>
</project>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AbstractAzureLogAnalyticsReportingTask.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AbstractAzureLogAnalyticsReportingTask.java
new file mode 100644
index 0000000..d48c765
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AbstractAzureLogAnalyticsReportingTask.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.azure.loganalytics;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.text.MessageFormat;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+import javax.xml.bind.DatatypeConverter;
+
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.AbstractReportingTask;
+
+/**
+ * Abstract ReportingTask to send metrics from Apache NiFi and JVM to Azure
+ * Monitor.
+ */
+public abstract class AbstractAzureLogAnalyticsReportingTask extends AbstractReportingTask {
+
+ private static final Charset UTF8 = Charset.forName("UTF-8");
+ private static final String HMAC_SHA256_ALG = "HmacSHA256";
+ private static final DateTimeFormatter RFC_1123_DATE_TIME = DateTimeFormatter
+ .ofPattern("EEE, dd MMM yyyy HH:mm:ss O");
+
+ static final PropertyDescriptor LOG_ANALYTICS_WORKSPACE_ID = new PropertyDescriptor.Builder()
+ .name("Log Analytics Workspace Id").description("Log Analytics Workspace Id").required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).sensitive(true).build();
+ static final PropertyDescriptor LOG_ANALYTICS_WORKSPACE_KEY = new PropertyDescriptor.Builder()
+ .name("Log Analytics Workspace Key").description("Azure Log Analytic Worskspace Key").required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).sensitive(true).build();
+ static final PropertyDescriptor APPLICATION_ID = new PropertyDescriptor.Builder().name("Application ID")
+ .description("The Application ID to be included in the metrics sent to Azure Log Analytics WS")
+ .required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).defaultValue("nifi")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+ static final PropertyDescriptor INSTANCE_ID = new PropertyDescriptor.Builder().name("Instance ID")
+ .description("Id of this NiFi instance to be included in the metrics sent to Azure Log Analytics WS")
+ .required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .defaultValue("${hostname(true)}").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+ static final PropertyDescriptor PROCESS_GROUP_IDS = new PropertyDescriptor.Builder().name("Process group ID(s)")
+ .description(
+ "If specified, the reporting task will send metrics the configured ProcessGroup(s) only. Multiple IDs should be separated by a comma. If"
+ + " none of the group-IDs could be found or no IDs are defined, the Root Process Group is used and global metrics are sent.")
+ .required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.createListValidator(true, true,
+ StandardValidators.createRegexMatchingValidator(Pattern.compile("[0-9a-z-]+"))))
+ .build();
+ static final PropertyDescriptor JOB_NAME = new PropertyDescriptor.Builder().name("Job Name")
+ .description("The name of the exporting job").defaultValue("nifi_reporting_job")
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+ static final PropertyDescriptor LOG_ANALYTICS_URL_ENDPOINT_FORMAT = new PropertyDescriptor.Builder()
+ .name("Log Analytics URL Endpoint Format").description("Log Analytics URL Endpoint Format").required(false)
+ .defaultValue("https://{0}.ods.opinsights.azure.com/api/logs?api-version=2016-04-01")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
+
+ protected String createAuthorization(String workspaceId, String key, int contentLength, String rfc1123Date) {
+ try {
+ String signature = String.format("POST\n%d\napplication/json\nx-ms-date:%s\n/api/logs", contentLength,
+ rfc1123Date);
+ Mac mac = Mac.getInstance(HMAC_SHA256_ALG);
+ mac.init(new SecretKeySpec(DatatypeConverter.parseBase64Binary(key), HMAC_SHA256_ALG));
+ String hmac = DatatypeConverter.printBase64Binary(mac.doFinal(signature.getBytes(UTF8)));
+ return String.format("SharedKey %s:%s", workspaceId, hmac);
+ } catch (NoSuchAlgorithmException | InvalidKeyException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(LOG_ANALYTICS_WORKSPACE_ID);
+ properties.add(LOG_ANALYTICS_WORKSPACE_KEY);
+ properties.add(APPLICATION_ID);
+ properties.add(INSTANCE_ID);
+ properties.add(PROCESS_GROUP_IDS);
+ properties.add(JOB_NAME);
+ properties.add(LOG_ANALYTICS_URL_ENDPOINT_FORMAT);
+ return properties;
+ }
+
+ /**
+ * Construct HttpPost and return it
+ *
+ * @param urlFormat URL format to Azure Log Analytics Endpoint
+ * @param workspaceId your azure log analytics workspace id
+ * @param logName log table name where metrics will be pushed
+ * @return HttpsURLConnection to your azure log analytics workspace
+ * @throws IllegalArgumentException if dataCollectorEndpoint url is invalid
+ */
+ protected HttpPost getHttpPost(final String urlFormat, final String workspaceId, final String logName)
+ throws IllegalArgumentException {
+ String dataCollectorEndpoint = MessageFormat.format(urlFormat, workspaceId);
+ HttpPost post = new HttpPost(dataCollectorEndpoint);
+ post.addHeader("Content-Type", "application/json");
+ post.addHeader("Log-Type", logName);
+ return post;
+ }
+
+ protected void sendToLogAnalytics(final HttpPost request, final String workspaceId, final String linuxPrimaryKey,
+ final String rawJson) throws IllegalArgumentException, RuntimeException, IOException {
+ final int bodyLength = rawJson.getBytes(UTF8).length;
+ final String nowRfc1123 = RFC_1123_DATE_TIME.format(ZonedDateTime.now(ZoneOffset.UTC));
+ final String createAuthorization = createAuthorization(workspaceId, linuxPrimaryKey, bodyLength, nowRfc1123);
+ request.addHeader("Authorization", createAuthorization);
+ request.addHeader("x-ms-date", nowRfc1123);
+ request.setEntity(new StringEntity(rawJson));
+ request.setEntity(new StringEntity(rawJson));
+ try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+ postRequest(httpClient, request);
+ }
+ }
+
+ /**
+ * post request with httpClient and httpPost
+ *
+ * @param httpClient HttpClient
+ * @param request HttpPost
+ * @throws IOException if httpClient.execute fails
+ * @throws RuntimeException if post request status return other than 200
+ */
+ protected void postRequest(final CloseableHttpClient httpClient, final HttpPost request)
+ throws IOException, RuntimeException {
+
+ try (CloseableHttpResponse response = httpClient.execute(request)) {
+ if (response != null && response.getStatusLine().getStatusCode() != 200) {
+ throw new RuntimeException(response.getStatusLine().toString());
+ }
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AzureLogAnalyticsProvenanceReportingTask.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AzureLogAnalyticsProvenanceReportingTask.java
new file mode 100644
index 0000000..da8cfa9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AzureLogAnalyticsProvenanceReportingTask.java
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.azure.loganalytics;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.text.DateFormat;
+import java.text.MessageFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.UUID;
+
+import javax.json.Json;
+import javax.json.JsonArrayBuilder;
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+import javax.json.JsonObjectBuilder;
+import javax.json.JsonValue;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
+
+@Tags({ "azure", "provenace", "reporting", "log analytics" })
+@CapabilityDescription("Publishes Provenance events to to a Azure Log Analytics workspace.")
+public class AzureLogAnalyticsProvenanceReportingTask extends AbstractAzureLogAnalyticsReportingTask {
+
+ protected static final String LAST_EVENT_ID_KEY = "last_event_id";
+ protected static final String DESTINATION_URL_PATH = "/nifi";
+ protected static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+
+ static final PropertyDescriptor LOG_ANALYTICS_CUSTOM_LOG_NAME = new PropertyDescriptor.Builder()
+ .name("Log Analytics Custom Log Name").description("Log Analytics Custom Log Name").required(false)
+ .defaultValue("nifiprovenance").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
+
+ static final AllowableValue BEGINNING_OF_STREAM = new AllowableValue("beginning-of-stream",
+ "Beginning of Stream",
+ "Start reading provenance Events from the beginning of the stream (the oldest event first)");
+
+ static final AllowableValue END_OF_STREAM = new AllowableValue("end-of-stream", "End of Stream",
+ "Start reading provenance Events from the end of the stream, ignoring old events");
+
+ static final PropertyDescriptor FILTER_EVENT_TYPE = new PropertyDescriptor.Builder()
+ .name("s2s-prov-task-event-filter").displayName("Event Type to Include")
+ .description("Comma-separated list of event types that will be used to filter the provenance events sent by the reporting task. "
+ + "Available event types are "
+ + Arrays.deepToString(ProvenanceEventType.values())
+ + ". If no filter is set, all the events are sent. If "
+ + "multiple filters are set, the filters are cumulative.")
+ .required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+ static final PropertyDescriptor FILTER_EVENT_TYPE_EXCLUDE = new PropertyDescriptor.Builder()
+ .name("s2s-prov-task-event-filter-exclude").displayName("Event Type to Exclude")
+ .description("Comma-separated list of event types that will be used to exclude the provenance events sent by the reporting task. "
+ + "Available event types are "
+ + Arrays.deepToString(ProvenanceEventType.values())
+ + ". If no filter is set, all the events are sent. If "
+ + "multiple filters are set, the filters are cumulative. If an event type is included in Event Type to Include and excluded here, then the "
+ + "exclusion takes precedence and the event will not be sent.")
+ .required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+ static final PropertyDescriptor FILTER_COMPONENT_TYPE = new PropertyDescriptor.Builder()
+ .name("s2s-prov-task-type-filter").displayName("Component Type to Include")
+ .description("Regular expression to filter the provenance events based on the component type. Only the events matching the regular "
+ + "expression will be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.")
+ .required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
+
+ static final PropertyDescriptor FILTER_COMPONENT_TYPE_EXCLUDE = new PropertyDescriptor.Builder()
+ .name("s2s-prov-task-type-filter-exclude").displayName("Component Type to Exclude")
+ .description("Regular expression to exclude the provenance events based on the component type. The events matching the regular "
+ + "expression will not be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. "
+ + "If a component type is included in Component Type to Include and excluded here, then the exclusion takes precedence and the event will not be sent.")
+ .required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
+
+ static final PropertyDescriptor FILTER_COMPONENT_ID = new PropertyDescriptor.Builder()
+ .name("s2s-prov-task-id-filter").displayName("Component ID to Include")
+ .description("Comma-separated list of component UUID that will be used to filter the provenance events sent by the reporting task. If no "
+ + "filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.")
+ .required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+ static final PropertyDescriptor FILTER_COMPONENT_ID_EXCLUDE = new PropertyDescriptor.Builder()
+ .name("s2s-prov-task-id-filter-exclude").displayName("Component ID to Exclude")
+ .description("Comma-separated list of component UUID that will be used to exclude the provenance events sent by the reporting task. If no "
+ + "filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. If a component UUID is included in "
+ + "Component ID to Include and excluded here, then the exclusion takes precedence and the event will not be sent.")
+ .required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+ static final PropertyDescriptor FILTER_COMPONENT_NAME = new PropertyDescriptor.Builder()
+ .name("s2s-prov-task-name-filter").displayName("Component Name to Include")
+ .description("Regular expression to filter the provenance events based on the component name. Only the events matching the regular "
+ + "expression will be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.")
+ .required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
+
+ static final PropertyDescriptor FILTER_COMPONENT_NAME_EXCLUDE = new PropertyDescriptor.Builder()
+ .name("s2s-prov-task-name-filter-exclude").displayName("Component Name to Exclude")
+ .description("Regular expression to exclude the provenance events based on the component name. The events matching the regular "
+ + "expression will not be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. "
+ + "If a component name is included in Component Name to Include and excluded here, then the exclusion takes precedence and the event will not be sent.")
+ .required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
+
+ static final PropertyDescriptor START_POSITION = new PropertyDescriptor.Builder().name("start-position")
+ .displayName("Start Position")
+ .description("If the Reporting Task has never been run, or if its state has been reset by a user, "
+ + "specifies where in the stream of Provenance Events the Reporting Task should start")
+ .allowableValues(BEGINNING_OF_STREAM, END_OF_STREAM)
+ .defaultValue(BEGINNING_OF_STREAM.getValue()).required(true).build();
+
+ static final PropertyDescriptor ALLOW_NULL_VALUES = new PropertyDescriptor.Builder().name("include-null-values")
+ .displayName("Include Null Values")
+ .description("Indicate if null values should be included in records. Default will be false")
+ .required(true).allowableValues("true", "false").defaultValue("false").build();
+
+ static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder().name("Platform")
+ .description("The value to use for the platform field in each event.").required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).defaultValue("nifi")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+ static final PropertyDescriptor INSTANCE_URL = new PropertyDescriptor.Builder().name("Instance URL")
+ .displayName("Instance URL")
+ .description("The URL of this instance to use in the Content URI of each event.").required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .defaultValue("http://${hostname(true)}:8080/nifi")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+ static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("Batch Size")
+ .displayName("Batch Size")
+ .description("Specifies how many records to send in a single batch, at most.").required(true)
+ .defaultValue("1000").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
+
+ private ConfigurationContext context;
+
+ private volatile ProvenanceEventConsumer consumer;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(LOG_ANALYTICS_WORKSPACE_ID);
+ properties.add(LOG_ANALYTICS_CUSTOM_LOG_NAME);
+ properties.add(LOG_ANALYTICS_WORKSPACE_KEY);
+ properties.add(APPLICATION_ID);
+ properties.add(INSTANCE_ID);
+ properties.add(JOB_NAME);
+ properties.add(LOG_ANALYTICS_URL_ENDPOINT_FORMAT);
+ properties.add(FILTER_EVENT_TYPE);
+ properties.add(FILTER_EVENT_TYPE_EXCLUDE);
+ properties.add(FILTER_COMPONENT_TYPE);
+ properties.add(FILTER_COMPONENT_TYPE_EXCLUDE);
+ properties.add(FILTER_COMPONENT_ID);
+ properties.add(FILTER_COMPONENT_ID_EXCLUDE);
+ properties.add(FILTER_COMPONENT_NAME);
+ properties.add(FILTER_COMPONENT_NAME_EXCLUDE);
+ properties.add(START_POSITION);
+ properties.add(ALLOW_NULL_VALUES);
+ properties.add(PLATFORM);
+ properties.add(INSTANCE_URL);
+ properties.add(BATCH_SIZE);
+ return properties;
+ }
+
+ public void CreateConsumer(final ReportingContext context) {
+ if (consumer != null)
+ return;
+ consumer = new ProvenanceEventConsumer();
+ consumer.setStartPositionValue(context.getProperty(START_POSITION).getValue());
+ consumer.setBatchSize(context.getProperty(BATCH_SIZE).asInteger());
+ consumer.setLogger(getLogger());
+ // initialize component type filtering
+ consumer.setComponentTypeRegex(
+ context.getProperty(FILTER_COMPONENT_TYPE).evaluateAttributeExpressions().getValue());
+ consumer.setComponentTypeRegexExclude(context.getProperty(FILTER_COMPONENT_TYPE_EXCLUDE)
+ .evaluateAttributeExpressions().getValue());
+ consumer.setComponentNameRegex(
+ context.getProperty(FILTER_COMPONENT_NAME).evaluateAttributeExpressions().getValue());
+ consumer.setComponentNameRegexExclude(context.getProperty(FILTER_COMPONENT_NAME_EXCLUDE)
+ .evaluateAttributeExpressions().getValue());
+
+ final String[] targetEventTypes = StringUtils.stripAll(StringUtils.split(
+ context.getProperty(FILTER_EVENT_TYPE).evaluateAttributeExpressions().getValue(), ','));
+ if (targetEventTypes != null) {
+ for (final String type : targetEventTypes) {
+ try {
+ consumer.addTargetEventType(ProvenanceEventType.valueOf(type));
+ } catch (final Exception e) {
+ getLogger().warn(type
+ + " is not a correct event type, removed from the filtering.");
+ }
+ }
+ }
+
+ final String[] targetEventTypesExclude = StringUtils
+ .stripAll(StringUtils.split(context.getProperty(FILTER_EVENT_TYPE_EXCLUDE)
+ .evaluateAttributeExpressions().getValue(), ','));
+ if (targetEventTypesExclude != null) {
+ for (final String type : targetEventTypesExclude) {
+ try {
+ consumer.addTargetEventTypeExclude(ProvenanceEventType.valueOf(type));
+ } catch (final Exception e) {
+ getLogger().warn(type
+ + " is not a correct event type, removed from the exclude filtering.");
+ }
+ }
+ }
+
+ // initialize component ID filtering
+ final String[] targetComponentIds = StringUtils.stripAll(StringUtils.split(
+ context.getProperty(FILTER_COMPONENT_ID).evaluateAttributeExpressions().getValue(),
+ ','));
+ if (targetComponentIds != null) {
+ consumer.addTargetComponentId(targetComponentIds);
+ }
+
+ final String[] targetComponentIdsExclude = StringUtils
+ .stripAll(StringUtils.split(context.getProperty(FILTER_COMPONENT_ID_EXCLUDE)
+ .evaluateAttributeExpressions().getValue(), ','));
+ if (targetComponentIdsExclude != null) {
+ consumer.addTargetComponentIdExclude(targetComponentIdsExclude);
+ }
+
+ consumer.setScheduled(true);
+ }
+
+ @Override
+ public void onTrigger(ReportingContext context) {
+ final boolean isClustered = context.isClustered();
+ final String nodeId = context.getClusterNodeIdentifier();
+ if (nodeId == null && isClustered) {
+ getLogger().debug(
+ "This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. "
+ + "Will wait for Node Identifier to be established.");
+ return;
+ }
+
+ try {
+ processProvenanceData(context);
+
+ } catch (final Exception e) {
+ getLogger().error("Failed to publish metrics to Azure Log Analytics", e);
+ }
+ }
+
+ public void processProvenanceData(final ReportingContext context) throws IOException {
+ getLogger().debug("Starting to process provenance data");
+ final String workspaceId = context.getProperty(LOG_ANALYTICS_WORKSPACE_ID)
+ .evaluateAttributeExpressions().getValue();
+ final String linuxPrimaryKey = context.getProperty(LOG_ANALYTICS_WORKSPACE_KEY)
+ .evaluateAttributeExpressions().getValue();
+ final String logName = context.getProperty(LOG_ANALYTICS_CUSTOM_LOG_NAME).evaluateAttributeExpressions()
+ .getValue();
+ final String urlEndpointFormat = context.getProperty(LOG_ANALYTICS_URL_ENDPOINT_FORMAT)
+ .evaluateAttributeExpressions().getValue();
+ final Integer batchSize = context.getProperty(BATCH_SIZE).asInteger();
+ final String dataCollectorEndpoint = MessageFormat.format(urlEndpointFormat, workspaceId);
+ final ProcessGroupStatus procGroupStatus = context.getEventAccess().getControllerStatus();
+ final String nodeId = context.getClusterNodeIdentifier();
+ final String rootGroupName = procGroupStatus == null ? null : procGroupStatus.getName();
+ final String platform = context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue();
+ final Boolean allowNullValues = context.getProperty(ALLOW_NULL_VALUES).asBoolean();
+ final String nifiUrl = context.getProperty(INSTANCE_URL).evaluateAttributeExpressions().getValue();
+ URL url;
+ try {
+ url = new URL(nifiUrl);
+ } catch (final MalformedURLException e1) {
+ throw new AssertionError();
+ }
+
+ final String hostname = url.getHost();
+ final Map<String, Object> config = Collections.emptyMap();
+ final JsonBuilderFactory factory = Json.createBuilderFactory(config);
+ final JsonObjectBuilder builder = factory.createObjectBuilder();
+ final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
+ df.setTimeZone(TimeZone.getTimeZone("Z"));
+ CreateConsumer(context);
+ consumer.consumeEvents(context, (mapHolder, events) -> {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append('[');
+ for (final ProvenanceEventRecord event : events) {
+ final String componentName = mapHolder.getComponentName(event.getComponentId());
+ final String processGroupId = mapHolder.getProcessGroupId(event.getComponentId(),
+ event.getComponentType());
+ final String processGroupName = mapHolder.getComponentName(processGroupId);
+ final JsonObject jo = serialize(factory, builder, event, df, componentName,
+ processGroupId, processGroupName, hostname, url, rootGroupName,
+ platform, nodeId, allowNullValues);
+ stringBuilder.append(jo.toString());
+ stringBuilder.append(',');
+ }
+ if (stringBuilder.charAt(stringBuilder.length() - 1) == ',')
+ stringBuilder.deleteCharAt(stringBuilder.length() - 1);
+ stringBuilder.append(']');
+ String str = stringBuilder.toString();
+ if (!str.equals("[]")) {
+ final HttpPost httpPost = new HttpPost(dataCollectorEndpoint);
+ httpPost.addHeader("Content-Type", "application/json");
+ httpPost.addHeader("Log-Type", logName);
+ getLogger().debug("Sending " + batchSize + " events of length " + str.length() + " to azure log analytics " + logName);
+ try {
+ sendToLogAnalytics(httpPost, workspaceId, linuxPrimaryKey, str);
+
+ } catch (final Exception e) {
+ getLogger().error("Failed to publish provenance data to Azure Log Analytics", e);
+ }
+ }
+ });
+ getLogger().debug("Done processing provenance data");
+ }
+
+ private JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder,
+ final ProvenanceEventRecord event, final DateFormat df, final String componentName,
+ final String processGroupId, final String processGroupName, final String hostname,
+ final URL nifiUrl, final String applicationName, final String platform,
+ final String nodeIdentifier, Boolean allowNullValues) {
+ addField(builder, "eventId", UUID.randomUUID().toString(), allowNullValues);
+ addField(builder, "eventOrdinal", event.getEventId(), allowNullValues);
+ addField(builder, "eventType", event.getEventType().name(), allowNullValues);
+ addField(builder, "timestampMillis", event.getEventTime(), allowNullValues);
+ addField(builder, "timestamp", df.format(event.getEventTime()), allowNullValues);
+ addField(builder, "durationMillis", event.getEventDuration(), allowNullValues);
+ addField(builder, "lineageStart", event.getLineageStartDate(), allowNullValues);
+ addField(builder, "details", event.getDetails(), allowNullValues);
+ addField(builder, "componentId", event.getComponentId(), allowNullValues);
+ addField(builder, "componentType", event.getComponentType(), allowNullValues);
+ addField(builder, "componentName", componentName, allowNullValues);
+ addField(builder, "processGroupId", processGroupId, allowNullValues);
+ addField(builder, "processGroupName", processGroupName, allowNullValues);
+ addField(builder, "entityId", event.getFlowFileUuid(), allowNullValues);
+ addField(builder, "entityType", "org.apache.nifi.flowfile.FlowFile", allowNullValues);
+ addField(builder, "entitySize", event.getFileSize(), allowNullValues);
+ addField(builder, "previousEntitySize", event.getPreviousFileSize(), allowNullValues);
+ addField(builder, factory, "updatedAttributes", event.getUpdatedAttributes(), allowNullValues);
+ addField(builder, factory, "previousAttributes", event.getPreviousAttributes(), allowNullValues);
+
+ addField(builder, "actorHostname", hostname, allowNullValues);
+ if (nifiUrl != null) {
+ // TO get URL Prefix, we just remove the /nifi from the end of the URL. We know
+ // that the URL ends with
+ // "/nifi" because the Property Validator enforces it
+ final String urlString = nifiUrl.toString();
+ final String urlPrefix = urlString.substring(0,
+ urlString.length() - DESTINATION_URL_PATH.length());
+
+ final String contentUriBase = urlPrefix + "/nifi-api/provenance-events/" + event.getEventId()
+ + "/content/";
+ final String nodeIdSuffix = nodeIdentifier == null ? "" : "?clusterNodeId=" + nodeIdentifier;
+ addField(builder, "contentURI", contentUriBase + "output" + nodeIdSuffix, allowNullValues);
+ addField(builder, "previousContentURI", contentUriBase + "input" + nodeIdSuffix,
+ allowNullValues);
+ }
+
+ addField(builder, factory, "parentIds", event.getParentUuids(), allowNullValues);
+ addField(builder, factory, "childIds", event.getChildUuids(), allowNullValues);
+ addField(builder, "transitUri", event.getTransitUri(), allowNullValues);
+ addField(builder, "remoteIdentifier", event.getSourceSystemFlowFileIdentifier(), allowNullValues);
+ addField(builder, "alternateIdentifier", event.getAlternateIdentifierUri(), allowNullValues);
+ addField(builder, "platform", platform, allowNullValues);
+ addField(builder, "application", applicationName, allowNullValues);
+ return builder.build();
+ }
+
+ @OnUnscheduled
+ public void onUnscheduled() {
+ if (consumer != null) {
+ getLogger().debug("Disabling schedule to consume provenance data.");
+ consumer.setScheduled(false);
+ }
+ }
+
+ public static void addField(final JsonObjectBuilder builder, final String key, final Object value,
+ boolean allowNullValues) {
+ if (value != null) {
+ if (value instanceof String) {
+ builder.add(key, (String) value);
+ } else if (value instanceof Integer) {
+ builder.add(key, (Integer) value);
+ } else if (value instanceof Boolean) {
+ builder.add(key, (Boolean) value);
+ } else if (value instanceof Long) {
+ builder.add(key, (Long) value);
+ } else {
+ builder.add(key, value.toString());
+ }
+ } else if (allowNullValues) {
+ builder.add(key, JsonValue.NULL);
+ }
+ }
+
+ public static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key,
+ final Map<String, String> values, Boolean allowNullValues) {
+ if (values != null) {
+ final JsonObjectBuilder mapBuilder = factory.createObjectBuilder();
+ for (final Map.Entry<String, String> entry : values.entrySet()) {
+
+ if (entry.getKey() == null) {
+ continue;
+ } else if (entry.getValue() == null) {
+ if (allowNullValues) {
+ mapBuilder.add(entry.getKey(), JsonValue.NULL);
+ }
+ } else {
+ mapBuilder.add(entry.getKey(), entry.getValue());
+ }
+ }
+
+ builder.add(key, mapBuilder);
+
+ } else if (allowNullValues) {
+ builder.add(key, JsonValue.NULL);
+ }
+ }
+
+ public static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key,
+ final Collection<String> values, Boolean allowNullValues) {
+ if (values != null) {
+ builder.add(key, createJsonArray(factory, values));
+ } else if (allowNullValues) {
+ builder.add(key, JsonValue.NULL);
+ }
+ }
+
+ private static JsonArrayBuilder createJsonArray(JsonBuilderFactory factory, final Collection<String> values) {
+ final JsonArrayBuilder builder = factory.createArrayBuilder();
+ for (final String value : values) {
+ if (value != null) {
+ builder.add(value);
+ }
+ }
+ return builder;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AzureLogAnalyticsReportingTask.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AzureLogAnalyticsReportingTask.java
index 8d64d2f..f9d9a82 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AzureLogAnalyticsReportingTask.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AzureLogAnalyticsReportingTask.java
@@ -17,29 +17,13 @@
package org.apache.nifi.reporting.azure.loganalytics;
import java.io.IOException;
-import java.nio.charset.Charset;
-import java.security.InvalidKeyException;
-import java.security.NoSuchAlgorithmException;
-import java.text.MessageFormat;
-import java.time.ZoneOffset;
-import java.time.ZonedDateTime;
-import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
-import java.util.regex.Pattern;
-
-import javax.crypto.Mac;
-import javax.crypto.spec.SecretKeySpec;
-import javax.xml.bind.DatatypeConverter;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
-import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
@@ -51,112 +35,29 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
import org.apache.nifi.metrics.jvm.JvmMetrics;
import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.azure.loganalytics.api.AzureLogAnalyticsMetricsFactory;
import org.apache.nifi.scheduling.SchedulingStrategy;
-
/**
* ReportingTask to send metrics from Apache NiFi and JVM to Azure Monitor.
*/
-@Tags({"reporting", "loganalytics", "metrics"})
-@CapabilityDescription("Sends JVM-metrics as well as Apache NiFi-metrics to a Azure Log Analytics workspace." +
- "Apache NiFi-metrics can be either configured global or on process-group level.")
+@Tags({ "azure", "metrics", "reporting", "log analytics" })
+@CapabilityDescription("Sends JVM-metrics as well as Apache NiFi-metrics to a Azure Log Analytics workspace."
+ + "Apache NiFi-metrics can be either configured global or on process-group level.")
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
-public class AzureLogAnalyticsReportingTask extends AbstractReportingTask {
+public class AzureLogAnalyticsReportingTask extends AbstractAzureLogAnalyticsReportingTask {
private static final String JVM_JOB_NAME = "jvm_global";
- private static final Charset UTF8 = Charset.forName("UTF-8");
- private static final String HMAC_SHA256_ALG = "HmacSHA256";
- private static final DateTimeFormatter RFC_1123_DATE_TIME = DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss O");
private final JvmMetrics virtualMachineMetrics = JmxJvmMetrics.getInstance();
-
- static final PropertyDescriptor LOG_ANALYTICS_WORKSPACE_ID = new PropertyDescriptor.Builder()
- .name("Log Analytics Workspace Id")
- .description("Log Analytics Workspace Id")
- .required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .sensitive(true)
- .build();
+ static final PropertyDescriptor SEND_JVM_METRICS = new PropertyDescriptor.Builder().name("Send JVM Metrics")
+ .description("Send JVM Metrics in addition to the NiFi-metrics").allowableValues("true", "false")
+ .defaultValue("false").required(true).build();
static final PropertyDescriptor LOG_ANALYTICS_CUSTOM_LOG_NAME = new PropertyDescriptor.Builder()
- .name("Log Analytics Custom Log Name")
- .description("Log Analytics Custom Log Name")
- .required(false)
- .defaultValue("nifimetrics")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .build();
- static final PropertyDescriptor LOG_ANALYTICS_WORKSPACE_KEY = new PropertyDescriptor.Builder()
- .name("Log Analytics Workspace Key")
- .description("Azure Log Analytic Worskspace Key")
- .required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .sensitive(true)
- .build();
- static final PropertyDescriptor APPLICATION_ID = new PropertyDescriptor.Builder()
- .name("Application ID")
- .description("The Application ID to be included in the metrics sent to Azure Log Analytics WS")
- .required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .defaultValue("nifi")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
- static final PropertyDescriptor INSTANCE_ID = new PropertyDescriptor.Builder()
- .name("Instance ID")
- .description("Id of this NiFi instance to be included in the metrics sent to Azure Log Analytics WS")
- .required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .defaultValue("${hostname(true)}")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
- static final PropertyDescriptor PROCESS_GROUP_IDS = new PropertyDescriptor.Builder()
- .name("Process group ID(s)")
- .description("If specified, the reporting task will send metrics the configured ProcessGroup(s) only. Multiple IDs should be separated by a comma. If"
- + " none of the group-IDs could be found or no IDs are defined, the Root Process Group is used and global metrics are sent.")
- .required(false)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .addValidator(StandardValidators
- .createListValidator(true, true, StandardValidators.createRegexMatchingValidator(Pattern.compile("[0-9a-z-]+"))))
- .build();
- static final PropertyDescriptor JOB_NAME = new PropertyDescriptor.Builder()
- .name("Job Name")
- .description("The name of the exporting job")
- .defaultValue("nifi_reporting_job")
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
- static final PropertyDescriptor SEND_JVM_METRICS = new PropertyDescriptor.Builder()
- .name("Send JVM Metrics")
- .description("Send JVM Metrics in addition to the NiFi-metrics")
- .allowableValues("true", "false")
- .defaultValue("false")
- .required(true)
- .build();
- static final PropertyDescriptor LOG_ANALYTICS_URL_ENDPOINT_FORMAT = new PropertyDescriptor.Builder()
- .name("Log Analytics URL Endpoint Format")
- .description("Log Analytics URL Endpoint Format")
- .required(false)
- .defaultValue("https://{0}.ods.opinsights.azure.com/api/logs?api-version=2016-04-01")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .build();
-
- private String createAuthorization(String workspaceId, String key, int contentLength, String rfc1123Date) {
- try {
- // Documentation: https://docs.microsoft.com/en-us/rest/api/loganalytics/create-request
- String signature = String.format("POST\n%d\napplication/json\nx-ms-date:%s\n/api/logs", contentLength, rfc1123Date);
- Mac mac = Mac.getInstance(HMAC_SHA256_ALG);
- mac.init(new SecretKeySpec(DatatypeConverter.parseBase64Binary(key), HMAC_SHA256_ALG));
- String hmac = DatatypeConverter.printBase64Binary(mac.doFinal(signature.getBytes(UTF8)));
- return String.format("SharedKey %s:%s", workspaceId, hmac);
- } catch (NoSuchAlgorithmException | InvalidKeyException e) {
- throw new RuntimeException(e);
- }
- }
+ .name("Log Analytics Custom Log Name").description("Log Analytics Custom Log Name").required(false)
+ .defaultValue("nifimetrics").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -168,31 +69,34 @@ public class AzureLogAnalyticsReportingTask extends AbstractReportingTask {
properties.add(INSTANCE_ID);
properties.add(PROCESS_GROUP_IDS);
properties.add(JOB_NAME);
- properties.add(SEND_JVM_METRICS);
properties.add(LOG_ANALYTICS_URL_ENDPOINT_FORMAT);
return properties;
}
@Override
- public void onTrigger(final ReportingContext context){
- final String workspaceId = context.getProperty(LOG_ANALYTICS_WORKSPACE_ID).evaluateAttributeExpressions().getValue();
- final String linuxPrimaryKey = context.getProperty(LOG_ANALYTICS_WORKSPACE_KEY).evaluateAttributeExpressions().getValue();
+ public void onTrigger(final ReportingContext context) {
+ final String workspaceId = context.getProperty(LOG_ANALYTICS_WORKSPACE_ID).evaluateAttributeExpressions()
+ .getValue();
+ final String linuxPrimaryKey = context.getProperty(LOG_ANALYTICS_WORKSPACE_KEY).evaluateAttributeExpressions()
+ .getValue();
final boolean jvmMetricsCollected = context.getProperty(SEND_JVM_METRICS).asBoolean();
-
- final String logName = context.getProperty(LOG_ANALYTICS_CUSTOM_LOG_NAME).evaluateAttributeExpressions().getValue();
+ final String logName = context.getProperty(LOG_ANALYTICS_CUSTOM_LOG_NAME).evaluateAttributeExpressions()
+ .getValue();
final String instanceId = context.getProperty(INSTANCE_ID).evaluateAttributeExpressions().getValue();
final String groupIds = context.getProperty(PROCESS_GROUP_IDS).evaluateAttributeExpressions().getValue();
- final String urlEndpointFormat = context.getProperty(LOG_ANALYTICS_URL_ENDPOINT_FORMAT).evaluateAttributeExpressions().getValue();
+ final String urlEndpointFormat = context.getProperty(LOG_ANALYTICS_URL_ENDPOINT_FORMAT)
+ .evaluateAttributeExpressions().getValue();
+
try {
List<Metric> allMetrics = null;
- if(groupIds == null || groupIds.isEmpty()) {
+ if (groupIds == null || groupIds.isEmpty()) {
ProcessGroupStatus status = context.getEventAccess().getControllerStatus();
String processGroupName = status.getName();
allMetrics = collectMetrics(instanceId, status, processGroupName, jvmMetricsCollected);
} else {
allMetrics = new ArrayList<>();
- for(String groupId: groupIds.split(",")) {
- groupId =groupId.trim();
+ for (String groupId : groupIds.split(",")) {
+ groupId = groupId.trim();
ProcessGroupStatus status = context.getEventAccess().getGroupStatus(groupId);
String processGroupName = status.getName();
allMetrics.addAll(collectMetrics(instanceId, status, processGroupName, jvmMetricsCollected));
@@ -206,35 +110,21 @@ public class AzureLogAnalyticsReportingTask extends AbstractReportingTask {
}
/**
- * Construct HttpPost and return it
- * @param urlFormat URL format to Azure Log Analytics Endpoint
- * @param workspaceId your azure log analytics workspace id
- * @param logName log table name where metrics will be pushed
- * @return HttpsURLConnection to your azure log analytics workspace
- * @throws IllegalArgumentException if dataCollectorEndpoint url is invalid
- */
- protected HttpPost getHttpPost(final String urlFormat, final String workspaceId, final String logName)
- throws IllegalArgumentException {
- String dataCollectorEndpoint =
- MessageFormat.format(urlFormat, workspaceId);
- HttpPost post = new HttpPost(dataCollectorEndpoint);
- post.addHeader("Content-Type", "application/json");
- post.addHeader("Log-Type", logName);
- return post;
- }
- /**
* send collected metrics to azure log analytics workspace
- * @param request HttpPost to Azure Log Analytics Endpoint
- * @param workspaceId your azure log analytics workspace id
+ *
+ * @param request HttpPost to Azure Log Analytics Endpoint
+ * @param workspaceId your azure log analytics workspace id
* @param linuxPrimaryKey your azure log analytics workspace key
- * @param allMetrics collected metrics to be sent
- * @throws IOException when there is an error in https url connection or read/write to the onnection
- * @throws IllegalArgumentException when there a exception in converting metrics to json string with Gson.toJson
- * @throws RuntimeException when httpPost fails with none 200 status code
+ * @param allMetrics collected metrics to be sent
+ * @throws IOException when there is an error in https url
+ * connection or read/write to the onnection
+ * @throws IllegalArgumentException when there a exception in converting metrics
+ * to json string with Gson.toJson
+ * @throws RuntimeException when httpPost fails with none 200 status
+ * code
*/
- protected void sendMetrics(final HttpPost request, final String workspaceId, final String linuxPrimaryKey, final List<Metric> allMetrics)
- throws IOException, IllegalArgumentException, RuntimeException {
-
+ protected void sendMetrics(final HttpPost request, final String workspaceId, final String linuxPrimaryKey,
+ final List<Metric> allMetrics) throws IOException, IllegalArgumentException, RuntimeException {
Gson gson = new GsonBuilder().create();
StringBuilder builder = new StringBuilder();
builder.append('[');
@@ -243,45 +133,20 @@ public class AzureLogAnalyticsReportingTask extends AbstractReportingTask {
builder.append(',');
}
builder.append(']');
-
- final String rawJson = builder.toString();
- final int bodyLength = rawJson.getBytes(UTF8).length;
- final String nowRfc1123 = RFC_1123_DATE_TIME.format(ZonedDateTime.now(ZoneOffset.UTC));
- final String createAuthorization = createAuthorization(workspaceId, linuxPrimaryKey, bodyLength, nowRfc1123);
- request.addHeader("Authorization", createAuthorization);
- request.addHeader("x-ms-date", nowRfc1123);
- request.setEntity(new StringEntity(rawJson));
- try(CloseableHttpClient httpClient = HttpClients.createDefault()){
- postRequest(httpClient, request);
- }
+ sendToLogAnalytics(request, workspaceId, linuxPrimaryKey, builder.toString());
}
/**
- * post request with httpClient and httpPost
- * @param httpClient HttpClient
- * @param request HttpPost
- * @throws IOException if httpClient.execute fails
- * @throws RuntimeException if post request status return other than 200
- */
- protected void postRequest(final CloseableHttpClient httpClient, final HttpPost request)
- throws IOException, RuntimeException {
-
- try (CloseableHttpResponse response = httpClient.execute(request)) {
- if(response != null && response.getStatusLine().getStatusCode() != 200) {
- throw new RuntimeException(response.getStatusLine().toString());
- }
- }
- }
- /**
* collect metrics to be sent to azure log analytics workspace
- * @param instanceId instance id
- * @param status process group status
- * @param processGroupName process group name
+ *
+ * @param instanceId instance id
+ * @param status process group status
+ * @param processGroupName process group name
* @param jvmMetricsCollected whether we want to collect jvm metrics or not
* @return list of metrics collected
*/
- private List<Metric> collectMetrics(final String instanceId,
- final ProcessGroupStatus status, final String processGroupName, final boolean jvmMetricsCollected) {
+ protected List<Metric> collectMetrics(final String instanceId, final ProcessGroupStatus status,
+ final String processGroupName, final boolean jvmMetricsCollected) {
List<Metric> allMetrics = new ArrayList<>();
// dataflow process group level metrics
@@ -290,9 +155,9 @@ public class AzureLogAnalyticsReportingTask extends AbstractReportingTask {
// connections process group level metrics
final List<ConnectionStatus> connectionStatuses = new ArrayList<>();
populateConnectionStatuses(status, connectionStatuses);
- for (ConnectionStatus connectionStatus: connectionStatuses) {
- allMetrics.addAll(
- AzureLogAnalyticsMetricsFactory.getConnectionStatusMetrics(connectionStatus, instanceId, processGroupName));
+ for (ConnectionStatus connectionStatus : connectionStatuses) {
+ allMetrics.addAll(AzureLogAnalyticsMetricsFactory.getConnectionStatusMetrics(connectionStatus, instanceId,
+ processGroupName));
}
// processor level metrics
@@ -300,13 +165,12 @@ public class AzureLogAnalyticsReportingTask extends AbstractReportingTask {
populateProcessorStatuses(status, processorStatuses);
for (final ProcessorStatus processorStatus : processorStatuses) {
allMetrics.addAll(
- AzureLogAnalyticsMetricsFactory.getProcessorMetrics(processorStatus, instanceId, processGroupName)
- );
+ AzureLogAnalyticsMetricsFactory.getProcessorMetrics(processorStatus, instanceId, processGroupName));
}
if (jvmMetricsCollected) {
allMetrics.addAll(
- AzureLogAnalyticsMetricsFactory.getJvmMetrics(virtualMachineMetrics, instanceId, JVM_JOB_NAME));
+ AzureLogAnalyticsMetricsFactory.getJvmMetrics(virtualMachineMetrics, instanceId, JVM_JOB_NAME));
}
return allMetrics;
@@ -319,7 +183,8 @@ public class AzureLogAnalyticsReportingTask extends AbstractReportingTask {
}
}
- private void populateConnectionStatuses(final ProcessGroupStatus groupStatus, final List<ConnectionStatus> statuses) {
+ private void populateConnectionStatuses(final ProcessGroupStatus groupStatus,
+ final List<ConnectionStatus> statuses) {
statuses.addAll(groupStatus.getConnectionStatus());
for (final ProcessGroupStatus childGroupStatus : groupStatus.getProcessGroupStatus()) {
populateConnectionStatuses(childGroupStatus, statuses);
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
index bbb8692..cbb3dc1 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
@@ -12,4 +12,5 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.nifi.reporting.azure.loganalytics.AzureLogAnalyticsReportingTask
\ No newline at end of file
+org.apache.nifi.reporting.azure.loganalytics.AzureLogAnalyticsReportingTask
+org.apache.nifi.reporting.azure.loganalytics.AzureLogAnalyticsProvenanceReportingTask
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/test/java/org/apache/nifi/reporting/azure/loganalytics/TestAzureLogAnalyticsProvenanceReportingTask.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/test/java/org/apache/nifi/reporting/azure/loganalytics/TestAzureLogAnalyticsProvenanceReportingTask.java
new file mode 100644
index 0000000..ee7f8b9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/test/java/org/apache/nifi/reporting/azure/loganalytics/TestAzureLogAnalyticsProvenanceReportingTask.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.azure.loganalytics;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.json.Json;
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObjectBuilder;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.junit.Test;
+
+public class TestAzureLogAnalyticsProvenanceReportingTask {
+
+ @Test
+ public void testAddField1() throws IOException, InterruptedException, InitializationException {
+
+ final Map<String, Object> config = Collections.emptyMap();
+ final JsonBuilderFactory factory = Json.createBuilderFactory(config);
+ final JsonObjectBuilder builder = factory.createObjectBuilder();
+ AzureLogAnalyticsProvenanceReportingTask.addField(builder, "TestKeyString", "StringValue", true);
+ AzureLogAnalyticsProvenanceReportingTask.addField(builder, "TestKeyInteger", 2674440, true);
+ AzureLogAnalyticsProvenanceReportingTask.addField(builder, "TestKeyLong", 1289904147324L, true);
+ AzureLogAnalyticsProvenanceReportingTask.addField(builder, "TestKeyBoolean", true, true);
+ AzureLogAnalyticsProvenanceReportingTask.addField(builder, "TestKeyNotSupportedObject", 1.25, true);
+ AzureLogAnalyticsProvenanceReportingTask.addField(builder, "TestKeyNull", null, true);
+ javax.json.JsonObject actualJson = builder.build();
+ String expectedjsonString = "{" +
+ "\"TestKeyString\": \"StringValue\"," +
+ "\"TestKeyInteger\": 2674440," +
+ "\"TestKeyLong\": 1289904147324," +
+ "\"TestKeyBoolean\": true," +
+ "\"TestKeyNotSupportedObject\": \"1.25\"," +
+ "\"TestKeyNull\": null" +
+ "}";
+ JsonObject expectedJson = new Gson().fromJson(expectedjsonString, JsonObject.class);
+ assertEquals(expectedJson.toString(), actualJson.toString());
+ }
+
+ @Test
+ public void testAddField2() throws IOException, InterruptedException, InitializationException {
+
+ final Map<String, Object> config = Collections.emptyMap();
+ final JsonBuilderFactory factory = Json.createBuilderFactory(config);
+ final JsonObjectBuilder builder = factory.createObjectBuilder();
+ Map<String, String> values = new HashMap<String, String>();
+ values.put("TestKeyString1", "StringValue1");
+ values.put("TestKeyString2", "StringValue2");
+ AzureLogAnalyticsProvenanceReportingTask.addField(builder, factory, "TestKeyString", values, true);
+ javax.json.JsonObject actualJson = builder.build();
+ String expectedjsonString = "{\"TestKeyString\":{\"TestKeyString2\":\"StringValue2\",\"TestKeyString1\":\"StringValue1\"}}";
+ JsonObject expectedJson = new Gson().fromJson(expectedjsonString, JsonObject.class);
+ assertEquals(expectedJson.toString(), actualJson.toString());
+ }
+
+ @Test
+ public void testAddField3() throws IOException, InterruptedException, InitializationException {
+
+ final Map<String, Object> config = Collections.emptyMap();
+ final JsonBuilderFactory factory = Json.createBuilderFactory(config);
+ final JsonObjectBuilder builder = factory.createObjectBuilder();
+ Collection<String> values = new ArrayList<String>();
+ values.add("TestValueString1");
+ values.add("TestValueString2");
+ AzureLogAnalyticsProvenanceReportingTask.addField(builder, factory, "TestKeyString", values, true);
+ javax.json.JsonObject actualJson = builder.build();
+ String expectedjsonString = "{\"TestKeyString\":[\"TestValueString1\",\"TestValueString2\"]}";
+ JsonObject expectedJson = new Gson().fromJson(expectedjsonString, JsonObject.class);
+ assertEquals(expectedJson.toString(), actualJson.toString());
+ }
+}
\ No newline at end of file