You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/11/21 18:03:12 UTC
[nifi] branch master updated: NIFI-6583: adding azure log analytics
reporting task to nifi-azure-bundle
This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 0acdde8 NIFI-6583: adding azure log analytics reporting task to nifi-azure-bundle
0acdde8 is described below
commit 0acdde827e02304426f8fee6be0b58848100a6b8
Author: sjyang18 <il...@hotmail.com>
AuthorDate: Tue Oct 15 18:12:07 2019 +0000
NIFI-6583: adding azure log analytics reporting task to nifi-azure-bundle
This closes #3817.
Signed-off-by: Mark Payne <ma...@hotmail.com>
---
.../nifi-azure-bundle/nifi-azure-nar/pom.xml | 8 +-
.../nifi-azure-reporting-task/pom.xml | 67 +++++
.../AzureLogAnalyticsReportingTask.java | 328 +++++++++++++++++++++
.../nifi/reporting/azure/loganalytics/Metric.java | 112 +++++++
.../reporting/azure/loganalytics/MetricNames.java | 66 +++++
.../azure/loganalytics/MetricsBuilder.java | 195 ++++++++++++
.../api/AzureLogAnalyticsMetricsFactory.java | 142 +++++++++
.../org.apache.nifi.reporting.ReportingTask | 15 +
.../additionalDetails.html | 54 ++++
.../TestAzureLogAnalyticsReportingTask.java | 255 ++++++++++++++++
.../azure/loganalytics/TestMetricsFactory.java | 79 +++++
.../azure/loganalytics/TestVerification.java | 65 ++++
nifi-nar-bundles/nifi-azure-bundle/pom.xml | 1 +
13 files changed, 1385 insertions(+), 2 deletions(-)
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
index 762d340..723d4bd 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
@@ -35,13 +35,17 @@
<artifactId>nifi-azure-processors</artifactId>
<version>1.11.0-SNAPSHOT</version>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-azure-reporting-task</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-azure-services-api-nar</artifactId>
<version>1.11.0-SNAPSHOT</version>
<type>nar</type>
- </dependency>
+ </dependency>
</dependencies>
</project>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/pom.xml
new file mode 100644
index 0000000..ee0dccb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/pom.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-azure-bundle</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>nifi-azure-reporting-task</artifactId>
+ <packaging>jar</packaging>
+
+ <properties>
+ <nifi.version>1.10.0-SNAPSHOT</nifi.version>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-utils</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-metrics</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>2.8.5</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.5.5</version>
+ </dependency>
+ <!-- test dependencies -->
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AzureLogAnalyticsReportingTask.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AzureLogAnalyticsReportingTask.java
new file mode 100644
index 0000000..8d64d2f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AzureLogAnalyticsReportingTask.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.azure.loganalytics;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.text.MessageFormat;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+import javax.xml.bind.DatatypeConverter;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
+import org.apache.nifi.metrics.jvm.JvmMetrics;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.azure.loganalytics.api.AzureLogAnalyticsMetricsFactory;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+
+
+/**
+ * ReportingTask to send metrics from Apache NiFi and JVM to Azure Monitor.
+ */
+@Tags({"reporting", "loganalytics", "metrics"})
+@CapabilityDescription("Sends JVM-metrics as well as Apache NiFi-metrics to a Azure Log Analytics workspace." +
+ "Apache NiFi-metrics can be either configured global or on process-group level.")
+@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
+public class AzureLogAnalyticsReportingTask extends AbstractReportingTask {
+
+ private static final String JVM_JOB_NAME = "jvm_global";
+ private static final Charset UTF8 = Charset.forName("UTF-8");
+ private static final String HMAC_SHA256_ALG = "HmacSHA256";
+ private static final DateTimeFormatter RFC_1123_DATE_TIME = DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss O");
+ private final JvmMetrics virtualMachineMetrics = JmxJvmMetrics.getInstance();
+
+
+ static final PropertyDescriptor LOG_ANALYTICS_WORKSPACE_ID = new PropertyDescriptor.Builder()
+ .name("Log Analytics Workspace Id")
+ .description("Log Analytics Workspace Id")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .sensitive(true)
+ .build();
+ static final PropertyDescriptor LOG_ANALYTICS_CUSTOM_LOG_NAME = new PropertyDescriptor.Builder()
+ .name("Log Analytics Custom Log Name")
+ .description("Log Analytics Custom Log Name")
+ .required(false)
+ .defaultValue("nifimetrics")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+ static final PropertyDescriptor LOG_ANALYTICS_WORKSPACE_KEY = new PropertyDescriptor.Builder()
+ .name("Log Analytics Workspace Key")
+ .description("Azure Log Analytic Worskspace Key")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .sensitive(true)
+ .build();
+ static final PropertyDescriptor APPLICATION_ID = new PropertyDescriptor.Builder()
+ .name("Application ID")
+ .description("The Application ID to be included in the metrics sent to Azure Log Analytics WS")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .defaultValue("nifi")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ static final PropertyDescriptor INSTANCE_ID = new PropertyDescriptor.Builder()
+ .name("Instance ID")
+ .description("Id of this NiFi instance to be included in the metrics sent to Azure Log Analytics WS")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .defaultValue("${hostname(true)}")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ static final PropertyDescriptor PROCESS_GROUP_IDS = new PropertyDescriptor.Builder()
+ .name("Process group ID(s)")
+ .description("If specified, the reporting task will send metrics the configured ProcessGroup(s) only. Multiple IDs should be separated by a comma. If"
+ + " none of the group-IDs could be found or no IDs are defined, the Root Process Group is used and global metrics are sent.")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators
+ .createListValidator(true, true, StandardValidators.createRegexMatchingValidator(Pattern.compile("[0-9a-z-]+"))))
+ .build();
+ static final PropertyDescriptor JOB_NAME = new PropertyDescriptor.Builder()
+ .name("Job Name")
+ .description("The name of the exporting job")
+ .defaultValue("nifi_reporting_job")
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ static final PropertyDescriptor SEND_JVM_METRICS = new PropertyDescriptor.Builder()
+ .name("Send JVM Metrics")
+ .description("Send JVM Metrics in addition to the NiFi-metrics")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+ static final PropertyDescriptor LOG_ANALYTICS_URL_ENDPOINT_FORMAT = new PropertyDescriptor.Builder()
+ .name("Log Analytics URL Endpoint Format")
+ .description("Log Analytics URL Endpoint Format")
+ .required(false)
+ .defaultValue("https://{0}.ods.opinsights.azure.com/api/logs?api-version=2016-04-01")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ private String createAuthorization(String workspaceId, String key, int contentLength, String rfc1123Date) {
+ try {
+ // Documentation: https://docs.microsoft.com/en-us/rest/api/loganalytics/create-request
+ String signature = String.format("POST\n%d\napplication/json\nx-ms-date:%s\n/api/logs", contentLength, rfc1123Date);
+ Mac mac = Mac.getInstance(HMAC_SHA256_ALG);
+ mac.init(new SecretKeySpec(DatatypeConverter.parseBase64Binary(key), HMAC_SHA256_ALG));
+ String hmac = DatatypeConverter.printBase64Binary(mac.doFinal(signature.getBytes(UTF8)));
+ return String.format("SharedKey %s:%s", workspaceId, hmac);
+ } catch (NoSuchAlgorithmException | InvalidKeyException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(LOG_ANALYTICS_WORKSPACE_ID);
+ properties.add(LOG_ANALYTICS_CUSTOM_LOG_NAME);
+ properties.add(LOG_ANALYTICS_WORKSPACE_KEY);
+ properties.add(APPLICATION_ID);
+ properties.add(INSTANCE_ID);
+ properties.add(PROCESS_GROUP_IDS);
+ properties.add(JOB_NAME);
+ properties.add(SEND_JVM_METRICS);
+ properties.add(LOG_ANALYTICS_URL_ENDPOINT_FORMAT);
+ return properties;
+ }
+
+ @Override
+ public void onTrigger(final ReportingContext context){
+ final String workspaceId = context.getProperty(LOG_ANALYTICS_WORKSPACE_ID).evaluateAttributeExpressions().getValue();
+ final String linuxPrimaryKey = context.getProperty(LOG_ANALYTICS_WORKSPACE_KEY).evaluateAttributeExpressions().getValue();
+ final boolean jvmMetricsCollected = context.getProperty(SEND_JVM_METRICS).asBoolean();
+
+ final String logName = context.getProperty(LOG_ANALYTICS_CUSTOM_LOG_NAME).evaluateAttributeExpressions().getValue();
+ final String instanceId = context.getProperty(INSTANCE_ID).evaluateAttributeExpressions().getValue();
+ final String groupIds = context.getProperty(PROCESS_GROUP_IDS).evaluateAttributeExpressions().getValue();
+ final String urlEndpointFormat = context.getProperty(LOG_ANALYTICS_URL_ENDPOINT_FORMAT).evaluateAttributeExpressions().getValue();
+ try {
+ List<Metric> allMetrics = null;
+ if(groupIds == null || groupIds.isEmpty()) {
+ ProcessGroupStatus status = context.getEventAccess().getControllerStatus();
+ String processGroupName = status.getName();
+ allMetrics = collectMetrics(instanceId, status, processGroupName, jvmMetricsCollected);
+ } else {
+ allMetrics = new ArrayList<>();
+ for(String groupId: groupIds.split(",")) {
+ groupId =groupId.trim();
+ ProcessGroupStatus status = context.getEventAccess().getGroupStatus(groupId);
+ String processGroupName = status.getName();
+ allMetrics.addAll(collectMetrics(instanceId, status, processGroupName, jvmMetricsCollected));
+ }
+ }
+ HttpPost httpPost = getHttpPost(urlEndpointFormat, workspaceId, logName);
+ sendMetrics(httpPost, workspaceId, linuxPrimaryKey, allMetrics);
+ } catch (Exception e) {
+ getLogger().error("Failed to publish metrics to Azure Log Analytics", e);
+ }
+ }
+
+ /**
+ * Construct HttpPost and return it
+ * @param urlFormat URL format to Azure Log Analytics Endpoint
+ * @param workspaceId your azure log analytics workspace id
+ * @param logName log table name where metrics will be pushed
+ * @return HttpsURLConnection to your azure log analytics workspace
+ * @throws IllegalArgumentException if dataCollectorEndpoint url is invalid
+ */
+ protected HttpPost getHttpPost(final String urlFormat, final String workspaceId, final String logName)
+ throws IllegalArgumentException {
+ String dataCollectorEndpoint =
+ MessageFormat.format(urlFormat, workspaceId);
+ HttpPost post = new HttpPost(dataCollectorEndpoint);
+ post.addHeader("Content-Type", "application/json");
+ post.addHeader("Log-Type", logName);
+ return post;
+ }
+ /**
+ * send collected metrics to azure log analytics workspace
+ * @param request HttpPost to Azure Log Analytics Endpoint
+ * @param workspaceId your azure log analytics workspace id
+ * @param linuxPrimaryKey your azure log analytics workspace key
+ * @param allMetrics collected metrics to be sent
+ * @throws IOException when there is an error in https url connection or read/write to the onnection
+ * @throws IllegalArgumentException when there a exception in converting metrics to json string with Gson.toJson
+ * @throws RuntimeException when httpPost fails with none 200 status code
+ */
+ protected void sendMetrics(final HttpPost request, final String workspaceId, final String linuxPrimaryKey, final List<Metric> allMetrics)
+ throws IOException, IllegalArgumentException, RuntimeException {
+
+ Gson gson = new GsonBuilder().create();
+ StringBuilder builder = new StringBuilder();
+ builder.append('[');
+ for (Metric current : allMetrics) {
+ builder.append(gson.toJson(current));
+ builder.append(',');
+ }
+ builder.append(']');
+
+ final String rawJson = builder.toString();
+ final int bodyLength = rawJson.getBytes(UTF8).length;
+ final String nowRfc1123 = RFC_1123_DATE_TIME.format(ZonedDateTime.now(ZoneOffset.UTC));
+ final String createAuthorization = createAuthorization(workspaceId, linuxPrimaryKey, bodyLength, nowRfc1123);
+ request.addHeader("Authorization", createAuthorization);
+ request.addHeader("x-ms-date", nowRfc1123);
+ request.setEntity(new StringEntity(rawJson));
+ try(CloseableHttpClient httpClient = HttpClients.createDefault()){
+ postRequest(httpClient, request);
+ }
+ }
+
+ /**
+ * post request with httpClient and httpPost
+ * @param httpClient HttpClient
+ * @param request HttpPost
+ * @throws IOException if httpClient.execute fails
+ * @throws RuntimeException if post request status return other than 200
+ */
+ protected void postRequest(final CloseableHttpClient httpClient, final HttpPost request)
+ throws IOException, RuntimeException {
+
+ try (CloseableHttpResponse response = httpClient.execute(request)) {
+ if(response != null && response.getStatusLine().getStatusCode() != 200) {
+ throw new RuntimeException(response.getStatusLine().toString());
+ }
+ }
+ }
+ /**
+ * collect metrics to be sent to azure log analytics workspace
+ * @param instanceId instance id
+ * @param status process group status
+ * @param processGroupName process group name
+ * @param jvmMetricsCollected whether we want to collect jvm metrics or not
+ * @return list of metrics collected
+ */
+ private List<Metric> collectMetrics(final String instanceId,
+ final ProcessGroupStatus status, final String processGroupName, final boolean jvmMetricsCollected) {
+ List<Metric> allMetrics = new ArrayList<>();
+
+ // dataflow process group level metrics
+ allMetrics.addAll(AzureLogAnalyticsMetricsFactory.getDataFlowMetrics(status, instanceId));
+
+ // connections process group level metrics
+ final List<ConnectionStatus> connectionStatuses = new ArrayList<>();
+ populateConnectionStatuses(status, connectionStatuses);
+ for (ConnectionStatus connectionStatus: connectionStatuses) {
+ allMetrics.addAll(
+ AzureLogAnalyticsMetricsFactory.getConnectionStatusMetrics(connectionStatus, instanceId, processGroupName));
+ }
+
+ // processor level metrics
+ final List<ProcessorStatus> processorStatuses = new ArrayList<>();
+ populateProcessorStatuses(status, processorStatuses);
+ for (final ProcessorStatus processorStatus : processorStatuses) {
+ allMetrics.addAll(
+ AzureLogAnalyticsMetricsFactory.getProcessorMetrics(processorStatus, instanceId, processGroupName)
+ );
+ }
+
+ if (jvmMetricsCollected) {
+ allMetrics.addAll(
+ AzureLogAnalyticsMetricsFactory.getJvmMetrics(virtualMachineMetrics, instanceId, JVM_JOB_NAME));
+
+ }
+ return allMetrics;
+ }
+
+ private void populateProcessorStatuses(final ProcessGroupStatus groupStatus, final List<ProcessorStatus> statuses) {
+ statuses.addAll(groupStatus.getProcessorStatus());
+ for (final ProcessGroupStatus childGroupStatus : groupStatus.getProcessGroupStatus()) {
+ populateProcessorStatuses(childGroupStatus, statuses);
+ }
+ }
+
+ private void populateConnectionStatuses(final ProcessGroupStatus groupStatus, final List<ConnectionStatus> statuses) {
+ statuses.addAll(groupStatus.getConnectionStatus());
+ for (final ProcessGroupStatus childGroupStatus : groupStatus.getProcessGroupStatus()) {
+ populateConnectionStatuses(childGroupStatus, statuses);
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/Metric.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/Metric.java
new file mode 100644
index 0000000..afd35d8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/Metric.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.azure.loganalytics;
+import com.google.gson.annotations.SerializedName;
+
+public class Metric {
+ public static final String CATEGORY_DATAFLOW = "DataFlow";
+ public static final String CATEGORY_CONNECTIONS = "Connections";
+ public static final String CATEGORY_PROCESSOR = "Processor";
+ public static final String CATEGORY_JVM = "JvmMetrics";
+
+ @SerializedName("Computer") String computer;
+ @SerializedName("ProcessGroupId") private String processGroupId;
+ @SerializedName("ProcessGroupName") private String processGroupName;
+ @SerializedName("ProcessorId") private String processorId;
+ @SerializedName("ProcessorName") private String processorName;
+ @SerializedName("Count") private Long count;
+ @SerializedName("Name") private String name;
+ @SerializedName("CategoryName") private String categoryName;
+ @SerializedName("Tags") private String tags;
+
+
+ public Metric(String instanceId, String processGroupId, String processGroupName ) {
+ this.computer = instanceId;
+ this.processGroupName = processGroupName;
+ this.processGroupId = processGroupId;
+ }
+
+ public void setCount(long value){
+ this.count = Long.valueOf((long)value);
+ }
+ public void setCount(double value){
+ this.count = Long.valueOf((long)value);
+ }
+ public void setCount(int value){
+ this.count = Long.valueOf((long)value);
+ }
+
+ public Long getCount() {
+ return this.count;
+ }
+
+ public String getComputer() {
+ return computer;
+ }
+ public void setCoumputer(String computer) {
+ this.computer = computer;
+ }
+
+ public String getProcessGroupId() {
+ return processGroupId;
+ }
+ public void setProcessGroupId(String processGroupId) {
+ this.processGroupId = processGroupId;
+ }
+
+ public String getProcessGroupName() {
+ return processGroupName;
+ }
+ public void setProcessGroupName(String processGroupName) {
+ this.processGroupName = processGroupName;
+ }
+
+ public String getProcessorId() {
+ return processorId;
+ }
+ public void setProcessorId(String processorId) {
+ this.processorId = processorId;
+ }
+
+ public String getProcessorName() {
+ return processorName;
+ }
+ public void setProcessorName(String processorName) {
+ this.processorName = processorName;
+ }
+
+ public String getName() {
+ return name;
+ }
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getCategoryName() {
+ return categoryName;
+ }
+ public void setCategoryName(String categoryName) {
+ this.categoryName = categoryName;
+ }
+
+ public String getTags() {
+ return tags;
+ }
+ public void setTags(String tags) {
+ this.tags = tags;
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/MetricNames.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/MetricNames.java
new file mode 100644
index 0000000..6b56380
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/MetricNames.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.azure.loganalytics;
+
+/**
+ * The Metric names to send to Azure Log Analytics.
+ */
+public interface MetricNames {
+
+ // Metric Name separator
+ String METRIC_NAME_SEPARATOR = ".";
+
+ // NiFi Metrics
+ String FLOW_FILES_RECEIVED = "FlowFilesReceived";
+ String FLOW_FILES_TRANSFERRED = "FlowFilesTransferred";
+ String BYTES_RECEIVED = "BytesReceived";
+ String FLOW_FILES_SENT = "FlowFilesSent";
+ String BYTES_SENT = "BytesSent";
+ String FLOW_FILES_QUEUED = "FlowFilesQueued";
+ String BYTES_TRANSFERRED= "BytesTransferred";
+ String BYTES_QUEUED= "BytesQueued";
+ String BYTES_READ = "BytesRead";
+ String BYTES_WRITTEN = "BytesWritten";
+ String ACTIVE_THREADS = "ActiveThreads";
+ String TOTAL_TASK_DURATION_SECONDS = "TotalTaskDurationSeconds";
+ String TOTAL_TASK_DURATION_NANOS = "TotalTaskDurationNanoSeconds";
+ String OUTPUT_CONTENT_SIZE = "OutputContentSize";
+ String INPUT_CONTENT_SIZE = "InputContentSize";
+ String QUEUED_CONTENT_SIZE = "QueuedContentSize";
+ String OUTPUT_COUNT = "OutputCount";
+ String INPUT_COUNT = "InputCount";
+ String QUEUED_COUNT = "QueuedCount";
+ String OUTPUT_BYTES = "OutputBytes";
+ String INPUT_BYTES = "InputBytes";
+ String QUEUED_BYTES = "QueuedBytes";
+
+ // JVM Metrics
+ String JVM_UPTIME = "jvm.uptime";
+ String JVM_HEAP_USED = "jvm.heap_used";
+ String JVM_HEAP_USAGE = "jvm.heap_usage";
+ String JVM_NON_HEAP_USAGE = "jvm.non_heap_usage";
+ String JVM_THREAD_STATES_RUNNABLE = "jvm.thread_states.runnable";
+ String JVM_THREAD_STATES_BLOCKED = "jvm.thread_states.blocked";
+ String JVM_THREAD_STATES_TIMED_WAITING = "jvm.thread_states.timed_waiting";
+ String JVM_THREAD_STATES_TERMINATED = "jvm.thread_states.terminated";
+ String JVM_THREAD_COUNT = "jvm.thread_count";
+ String JVM_DAEMON_THREAD_COUNT = "jvm.daemon_thread_count";
+ String JVM_FILE_DESCRIPTOR_USAGE = "jvm.file_descriptor_usage";
+ String JVM_GC_RUNS = "jvm.gc.runs";
+ String JVM_GC_TIME = "jvm.gc.time";
+
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/MetricsBuilder.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/MetricsBuilder.java
new file mode 100644
index 0000000..5578290
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/MetricsBuilder.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.azure.loganalytics;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * MetricsBuilder builds the list of metrics
+ */
+public class MetricsBuilder{
+ private List<Metric> metrics = new ArrayList<>();
+
+ private String computer;
+ private String categoryName;
+ private String processGroupId;
+ private String processGroupName;
+ private String processorId;
+ private String processorName;
+ private boolean isProcessorMetric = false;
+ private String tags = null;
+
+
+ public MetricsBuilder(String category, String instanceId, String processGroupId, String processGroupName) {
+ this.computer = instanceId;
+ this.processGroupName = processGroupName;
+ this.processGroupId = processGroupId;
+ this.categoryName = category;
+ if (category.equals(Metric.CATEGORY_PROCESSOR)){
+ isProcessorMetric = true;
+ }
+ }
+
+ public MetricsBuilder(String category, String instanceId, String processGroupId, String processGroupName, String processorId, String processorName) {
+ this(category, instanceId,processGroupId,processGroupName);
+ this.processorId = processorId;
+ this.processorName =processorName;
+ }
+
+ public MetricsBuilder setProcessorId(String processorId){
+ this.processorId = processorId;
+ return this;
+ }
+
+ public MetricsBuilder setProcessorName(String processorName){
+ this.processorName = processorName;
+ return this;
+ }
+
+ public MetricsBuilder setTags(String tags) {
+ this.tags = tags;
+ return this;
+ }
+
+ public MetricsBuilder metric(String metricName, long count){
+ Metric metric = null;
+ if(isProcessorMetric) {
+ metric = new Metric(this.computer, this.processGroupId, this.processGroupName);
+ metric.setProcessorId(this.processorId);
+ metric.setProcessorName(this.processorName);
+ } else {
+ metric = new Metric(this.computer, this.processGroupId, this.processGroupName);
+ }
+ metric.setCategoryName(this.categoryName);
+ metric.setName(metricName);
+ metric.setCount(count);
+ if(this.tags != null) {
+ metric.setTags(this.tags);
+ }
+ metrics.add(metric);
+ return this;
+ }
+
+ public MetricsBuilder metric(String metricName, double count){
+ Metric metric = null;
+ if(isProcessorMetric) {
+ metric = new Metric(this.computer, this.processGroupId, this.processGroupName);
+ metric.setProcessorId(this.processorId);
+ metric.setProcessorName(this.processorName);
+ } else {
+ metric = new Metric(this.computer, this.processGroupId, this.processGroupName);
+ }
+ metric.setCategoryName(this.categoryName);
+ metric.setName(metricName);
+ metric.setCount(count);
+ if(this.tags != null) {
+ metric.setTags(this.tags);
+ }
+ metrics.add(metric);
+ return this;
+ }
+
+ public MetricsBuilder metric(String metricName, int count) {
+ Metric metric = null;
+ if(isProcessorMetric) {
+ metric = new Metric(this.computer, this.processGroupId, this.processGroupName);
+ metric.setProcessorId(this.processorId);
+ metric.setProcessorName(this.processorName);
+ } else {
+ metric = new Metric(this.computer, this.processGroupId, this.processGroupName);
+ }
+ metric.setCategoryName(this.categoryName);
+ metric.setName(metricName);
+ metric.setCount(count);
+ if(this.tags != null) {
+ metric.setTags(this.tags);
+ }
+ metrics.add(metric);
+ return this;
+ }
+ public List<Metric> build() {
+ return metrics;
+ }
+ public List<Metric> getMetrics() {
+ return this.metrics;
+ }
+
+ public void setMetrics(List<Metric> metrics) {
+ this.metrics = metrics;
+ }
+
+ public String getComputer() {
+ return this.computer;
+ }
+
+ public void setComputer(String Computer) {
+ this.computer = Computer;
+ }
+
+ public String getCategoryName() {
+ return this.categoryName;
+ }
+
+ public void setCategoryName(String CategoryName) {
+ this.categoryName = CategoryName;
+ }
+
+ public String getProcessGroupId() {
+ return this.processGroupId;
+ }
+
+ public void setProcessGroupId(String ProcessGroupId) {
+ this.processGroupId = ProcessGroupId;
+ }
+
+ public String getProcessGroupName() {
+ return this.processGroupName;
+ }
+
+ public void setProcessGroupName(String ProcessGroupName) {
+ this.processGroupName = ProcessGroupName;
+ }
+
+ public String getProcessorId() {
+ return this.processorId;
+ }
+
+
+ public String getProcessorName() {
+ return this.processorName;
+ }
+
+
+ public boolean isIsProcessorMetric() {
+ return this.isProcessorMetric;
+ }
+
+ public boolean getIsProcessorMetric() {
+ return this.isProcessorMetric;
+ }
+
+ public void setIsProcessorMetric(boolean isProcessorMetric) {
+ this.isProcessorMetric = isProcessorMetric;
+ }
+
+ public String getTags() {
+ return this.tags;
+ }
+
+
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/api/AzureLogAnalyticsMetricsFactory.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/api/AzureLogAnalyticsMetricsFactory.java
new file mode 100644
index 0000000..14c2596
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/api/AzureLogAnalyticsMetricsFactory.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.azure.loganalytics.api;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.nifi.metrics.jvm.JvmMetrics;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.reporting.azure.loganalytics.MetricNames;
+import org.apache.nifi.reporting.azure.loganalytics.Metric;
+import org.apache.nifi.reporting.azure.loganalytics.MetricsBuilder;
+
+
+public class AzureLogAnalyticsMetricsFactory {
+
+ public static List<Metric> getDataFlowMetrics(ProcessGroupStatus status, String instanceId){
+
+ final String groupId = status.getId();
+ final String groupName = status.getName();
+ MetricsBuilder builder= new MetricsBuilder(Metric.CATEGORY_DATAFLOW,instanceId, groupId, groupName);
+
+ // build dataflow metrics
+ builder.metric(MetricNames.FLOW_FILES_RECEIVED, status.getFlowFilesReceived())
+ .metric(MetricNames.FLOW_FILES_SENT, status.getFlowFilesSent())
+ .metric(MetricNames.FLOW_FILES_QUEUED, status.getQueuedCount())
+ .metric(MetricNames.BYTES_RECEIVED,status.getBytesReceived())
+ .metric(MetricNames.BYTES_WRITTEN,status.getBytesWritten())
+ .metric(MetricNames.BYTES_READ, status.getBytesRead())
+ .metric(MetricNames.BYTES_SENT, status.getBytesSent())
+ .metric(MetricNames.BYTES_QUEUED,status.getQueuedContentSize())
+ .metric(MetricNames.ACTIVE_THREADS,status.getActiveThreadCount())
+ .metric(MetricNames.TOTAL_TASK_DURATION_SECONDS,calculateProcessingNanos(status));
+ return builder.build();
+ }
+
+
+ public static List<Metric> getConnectionStatusMetrics(ConnectionStatus status, String instanceId, String groupName){
+
+ final String groupId = status.getGroupId();
+ final String tags = String.format(
+ "[source=%s][destination=%s][cname=%s]", status.getSourceName(), status.getDestinationName(),
+ status.getName());
+ MetricsBuilder builder= new MetricsBuilder(Metric.CATEGORY_CONNECTIONS,instanceId, groupId, groupName);
+
+ builder.setTags(tags)
+ .metric(MetricNames.INPUT_COUNT,status.getInputCount())
+ .metric(MetricNames.INPUT_BYTES, status.getInputBytes())
+ .metric(MetricNames.QUEUED_COUNT, status.getQueuedCount())
+ .metric(MetricNames.QUEUED_BYTES, status.getQueuedBytes())
+ .metric(MetricNames.OUTPUT_COUNT, status.getOutputCount())
+ .metric(MetricNames.OUTPUT_BYTES, status.getOutputBytes());
+
+ return builder.build();
+ }
+
+ public static List<Metric> getProcessorMetrics(ProcessorStatus status, String instanceId, String groupName){
+
+ MetricsBuilder builder= new MetricsBuilder(Metric.CATEGORY_PROCESSOR,instanceId, status.getGroupId(), groupName);
+
+ builder.setProcessorId(status.getId())
+ .setProcessorName(status.getName())
+ .metric(MetricNames.FLOW_FILES_RECEIVED, status.getInputCount())
+ .metric(MetricNames.FLOW_FILES_SENT, status.getOutputCount())
+ .metric(MetricNames.BYTES_READ, status.getInputBytes())
+ .metric(MetricNames.BYTES_WRITTEN,status.getOutputBytes())
+ .metric(MetricNames.ACTIVE_THREADS, status.getActiveThreadCount())
+ .metric(MetricNames.TOTAL_TASK_DURATION_SECONDS, status.getProcessingNanos());
+
+ return builder.build();
+ }
+
+ //virtual machine metrics
+ public static List<Metric> getJvmMetrics(JvmMetrics virtualMachineMetrics, String instanceId, String groupName) {
+
+ MetricsBuilder builder = new MetricsBuilder(Metric.CATEGORY_JVM, instanceId, "", groupName);
+
+ builder.metric(MetricNames.JVM_HEAP_USED, virtualMachineMetrics.heapUsed(DataUnit.B))
+ .metric(MetricNames.JVM_HEAP_USAGE, virtualMachineMetrics.heapUsage())
+ .metric(MetricNames.JVM_NON_HEAP_USAGE, virtualMachineMetrics.nonHeapUsage())
+ .metric(MetricNames.JVM_FILE_DESCRIPTOR_USAGE, virtualMachineMetrics.fileDescriptorUsage())
+ .metric(MetricNames.JVM_UPTIME, virtualMachineMetrics.uptime())
+ .metric(MetricNames.JVM_THREAD_COUNT, virtualMachineMetrics.threadCount())
+ .metric(MetricNames.JVM_DAEMON_THREAD_COUNT, virtualMachineMetrics.daemonThreadCount());
+
+ // Append GC stats
+ virtualMachineMetrics.garbageCollectors()
+ .forEach((name, stat) -> {
+ name = name.toLowerCase().replaceAll("\\s", "_");
+ builder.metric(MetricNames.JVM_GC_RUNS + "." + name, stat.getRuns())
+ .metric(MetricNames.JVM_GC_TIME + "." + name, stat.getTime(TimeUnit.MILLISECONDS));
+ });
+
+ // Append thread states
+ virtualMachineMetrics.threadStatePercentages()
+ .forEach((state, usage) -> {
+ String name = state.name().toLowerCase().replaceAll("\\s", "_");
+ builder.metric("jvm.thread_states." + name, usage);
+ });
+
+ // Append pool stats
+ virtualMachineMetrics.memoryPoolUsage()
+ .forEach((name, usage) -> {
+ name = name.toLowerCase().replaceAll("\\s", "_");
+ builder.metric("jvm.mem_pool_" + name, usage);
+ });
+
+ return builder.build();
+
+ }
+
+ // calculates the total processing time of all processors in nanos
+ static long calculateProcessingNanos(final ProcessGroupStatus status) {
+ long nanos = 0L;
+
+ for (final ProcessorStatus procStats : status.getProcessorStatus()) {
+ nanos += procStats.getProcessingNanos();
+ }
+
+ for (final ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) {
+ nanos += calculateProcessingNanos(childGroupStatus);
+ }
+
+ return nanos;
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
new file mode 100644
index 0000000..bbb8692
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.reporting.azure.loganalytics.AzureLogAnalyticsReportingTask
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/resources/docs/org/apache/nifi/reporting/azure/loganalytics/AzureLogAnalyticsReportingTask/additionalDetails.html b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/resources/docs/org/apache/nifi/reporting/azure/loganalytics/AzureLogAnalyticsReportingTask/additionalDetails.html
new file mode 100644
index 0000000..8c7010e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/resources/docs/org/apache/nifi/reporting/azure/loganalytics/AzureLogAnalyticsReportingTask/additionalDetails.html
@@ -0,0 +1,54 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<!DOCTYPE html>
+<html lang="en">
+<head>
+ <meta charset="utf-8"/>
+ <title>AzureLogAnalyticsReportingTask</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
+</head>
+
+<body>
+<h2>AzureLogAnalyticsReportingTask</h2>
+
+<p>This ReportingTask sends the following metrics to Azure Log Analytics workspace:</p>
+<ul>
+ <li>FlowFilesReceivedLast5Minutes</li>
+ <li>BytesReceivedLast5Minutes</li>
+ <li>FlowFilesSentLast5Minutes</li>
+ <li>BytesSentLast5Minutes</li>
+ <li>FlowFilesQueued</li>
+ <li>BytesQueued</li>
+ <li>BytesReadLast5Minutes</li>
+ <li>BytesWrittenLast5Minutes</li>
+ <li>ActiveThreads</li>
+ <li>TotalTaskDurationSeconds</li>
+ <li>jvm.uptime</li>
+ <li>jvm.heap_used</li>
+ <li>jvm.heap_usage</li>
+ <li>jvm.non_heap_usage</li>
+ <li>jvm.thread_states.runnable</li>
+ <li>jvm.thread_states.blocked</li>
+ <li>jvm.thread_states.timed_waiting</li>
+ <li>jvm.thread_states.terminated</li>
+ <li>jvm.thread_count</li>
+ <li>jvm.daemon_thread_count</li>
+ <li>jvm.file_descriptor_usage</li>
+ <li>jvm.gc.runs</li>
+ <li>jvm.gc.time</li>
+</ul>
+
+</body>
+</html>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/test/java/org/apache/nifi/reporting/azure/loganalytics/TestAzureLogAnalyticsReportingTask.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/test/java/org/apache/nifi/reporting/azure/loganalytics/TestAzureLogAnalyticsReportingTask.java
new file mode 100644
index 0000000..b8c4c17
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/test/java/org/apache/nifi/reporting/azure/loganalytics/TestAzureLogAnalyticsReportingTask.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.azure.loganalytics;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.state.MockStateManager;
+import org.apache.nifi.util.MockComponentLog;
+
+import org.apache.nifi.util.MockReportingContext;
+import org.apache.nifi.util.MockReportingInitializationContext;
+import org.apache.nifi.util.MockVariableRegistry;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+public class TestAzureLogAnalyticsReportingTask {
+
+ private static final String TEST_INIT_CONTEXT_ID = "test-init-context-id";
+ private static final String TEST_INIT_CONTEXT_NAME = "test-init-context-name";
+ private static final String TEST_TASK_ID = "test-azureloganalyticsreportingtask-id";
+ private static final String MOCK_KEY = "abcdefg";
+ private static final String TEST_GROUP1_ID= "testgpid1";
+ private static final String TEST_GROUP2_ID= "testgpid2";
+ private MockReportingInitializationContext reportingInitContextStub;
+ private MockReportingContext reportingContextStub;
+ private TestableAzureLogAnalyticsReportingTask testedReportingTask;
+ private ProcessGroupStatus rootGroupStatus;
+ private ProcessGroupStatus testGroupStatus;
+ private ProcessGroupStatus testGroupStatus2;
+ private ProcessorStatus procStatus;
+
+ @Before
+ public void setup() {
+ testedReportingTask = new TestableAzureLogAnalyticsReportingTask();
+ rootGroupStatus = new ProcessGroupStatus();
+ reportingInitContextStub = new MockReportingInitializationContext(TEST_INIT_CONTEXT_ID, TEST_INIT_CONTEXT_NAME,
+ new MockComponentLog(TEST_TASK_ID, testedReportingTask));
+
+ reportingContextStub = new MockReportingContext(Collections.emptyMap(),
+ new MockStateManager(testedReportingTask), new MockVariableRegistry());
+
+ reportingContextStub.setProperty(AzureLogAnalyticsReportingTask.INSTANCE_ID.getName(), TEST_TASK_ID);
+ reportingContextStub.setProperty(AzureLogAnalyticsReportingTask.LOG_ANALYTICS_WORKSPACE_ID.getName(), TEST_TASK_ID);
+ reportingContextStub.setProperty(AzureLogAnalyticsReportingTask.LOG_ANALYTICS_WORKSPACE_KEY.getName(), MOCK_KEY);
+
+
+ rootGroupStatus.setId("1234");
+ rootGroupStatus.setFlowFilesReceived(5);
+ rootGroupStatus.setBytesReceived(10000);
+ rootGroupStatus.setFlowFilesSent(10);
+ rootGroupStatus.setBytesSent(20000);
+ rootGroupStatus.setQueuedCount(100);
+ rootGroupStatus.setQueuedContentSize(1024L);
+ rootGroupStatus.setBytesRead(60000L);
+ rootGroupStatus.setBytesWritten(80000L);
+ rootGroupStatus.setActiveThreadCount(5);
+ rootGroupStatus.setName("root");
+ rootGroupStatus.setFlowFilesTransferred(5);
+ rootGroupStatus.setBytesTransferred(10000);
+ rootGroupStatus.setOutputContentSize(1000L);
+ rootGroupStatus.setInputContentSize(1000L);
+ rootGroupStatus.setOutputCount(100);
+ rootGroupStatus.setInputCount(1000);
+ initProcessorStatuses();
+
+ }
+
+ private void initProcessorStatuses() {
+ procStatus = new ProcessorStatus();
+ procStatus.setProcessingNanos(123456789);
+ procStatus.setInputCount(2);
+ procStatus.setOutputCount(4);
+ procStatus.setActiveThreadCount(6);
+ procStatus.setBytesSent(1256);
+ procStatus.setName("sampleProcessor");
+ Collection<ProcessorStatus> processorStatuses = new ArrayList<>();
+ processorStatuses.add(procStatus);
+ rootGroupStatus.setProcessorStatus(processorStatuses);
+
+ ProcessGroupStatus groupStatus = new ProcessGroupStatus();
+ groupStatus.setProcessorStatus(processorStatuses);
+
+ Collection<ProcessGroupStatus> groupStatuses = new ArrayList<>();
+ groupStatuses.add(groupStatus);
+ rootGroupStatus.setProcessGroupStatus(groupStatuses);
+ }
+
+ private void initTestGroupStatuses() {
+ testGroupStatus = new ProcessGroupStatus();
+ testGroupStatus.setId(TEST_GROUP1_ID);
+ testGroupStatus.setFlowFilesReceived(5);
+ testGroupStatus.setBytesReceived(10000);
+ testGroupStatus.setFlowFilesSent(10);
+ testGroupStatus.setBytesSent(20000);
+ testGroupStatus.setQueuedCount(100);
+ testGroupStatus.setQueuedContentSize(1024L);
+ testGroupStatus.setBytesRead(60000L);
+ testGroupStatus.setBytesWritten(80000L);
+ testGroupStatus.setActiveThreadCount(5);
+ testGroupStatus.setName(TEST_GROUP1_ID);
+ testGroupStatus.setFlowFilesTransferred(5);
+ testGroupStatus.setBytesTransferred(10000);
+ testGroupStatus.setOutputContentSize(1000L);
+ testGroupStatus.setInputContentSize(1000L);
+ testGroupStatus.setOutputCount(100);
+ testGroupStatus.setInputCount(1000);
+ }
+ private void initTestGroup2Statuses() {
+ testGroupStatus2 = new ProcessGroupStatus();
+ testGroupStatus2.setId(TEST_GROUP2_ID);
+ testGroupStatus2.setFlowFilesReceived(5);
+ testGroupStatus2.setBytesReceived(10000);
+ testGroupStatus2.setFlowFilesSent(10);
+ testGroupStatus2.setBytesSent(20000);
+ testGroupStatus2.setQueuedCount(100);
+ testGroupStatus2.setQueuedContentSize(1024L);
+ testGroupStatus2.setBytesRead(60000L);
+ testGroupStatus2.setBytesWritten(80000L);
+ testGroupStatus2.setActiveThreadCount(5);
+ testGroupStatus2.setName(TEST_GROUP2_ID);
+ testGroupStatus2.setFlowFilesTransferred(5);
+ testGroupStatus2.setBytesTransferred(10000);
+ testGroupStatus2.setOutputContentSize(1000L);
+ testGroupStatus2.setInputContentSize(1000L);
+ testGroupStatus2.setOutputCount(100);
+ testGroupStatus2.setInputCount(1000);
+ }
+ @Test
+ public void testOnTrigger() throws IOException, InterruptedException, InitializationException {
+ testedReportingTask.initialize(reportingInitContextStub);
+ reportingContextStub.getEventAccess().setProcessGroupStatus(rootGroupStatus);
+ testedReportingTask.onTrigger(reportingContextStub);
+
+ List<Metric> collectedMetrics = testedReportingTask.getMetricsCollected();
+ TestVerification.assertDatatFlowMetrics(collectedMetrics);
+ }
+ @Test
+ public void testOnTriggerWithOnePG() throws IOException, InterruptedException, InitializationException {
+ initTestGroupStatuses();
+ reportingContextStub.setProperty(AzureLogAnalyticsReportingTask.PROCESS_GROUP_IDS.getName(), TEST_GROUP1_ID);
+ testedReportingTask.initialize(reportingInitContextStub);
+ reportingContextStub.getEventAccess().setProcessGroupStatus(rootGroupStatus);
+ reportingContextStub.getEventAccess().setProcessGroupStatus(TEST_GROUP1_ID, testGroupStatus);
+ testedReportingTask.onTrigger(reportingContextStub);
+
+ List<Metric> collectedMetrics = testedReportingTask.getMetricsCollected();
+ TestVerification.assertDatatFlowMetrics(collectedMetrics);
+ }
+ @Test
+ public void testOnTriggerWithPGList() throws IOException, InterruptedException, InitializationException {
+ initTestGroupStatuses();
+ initTestGroup2Statuses();
+ reportingContextStub.setProperty(AzureLogAnalyticsReportingTask.PROCESS_GROUP_IDS.getName(),
+ String.format("%s, %s", TEST_GROUP1_ID, TEST_GROUP2_ID));
+ testedReportingTask.initialize(reportingInitContextStub);
+ reportingContextStub.getEventAccess().setProcessGroupStatus(rootGroupStatus);
+ reportingContextStub.getEventAccess().setProcessGroupStatus(TEST_GROUP1_ID, testGroupStatus);
+ reportingContextStub.getEventAccess().setProcessGroupStatus(TEST_GROUP2_ID, testGroupStatus2);
+ testedReportingTask.onTrigger(reportingContextStub);
+
+ List<Metric> collectedMetrics = testedReportingTask.getMetricsCollected();
+ TestVerification.assertDatatFlowMetrics(collectedMetrics);
+ }
+
+ @Test
+ public void testEmitJVMMetrics() throws IOException, InterruptedException, InitializationException {
+ reportingContextStub.setProperty(AzureLogAnalyticsReportingTask.SEND_JVM_METRICS.getName(), "true");
+ testedReportingTask.initialize(reportingInitContextStub);
+
+ reportingContextStub.getEventAccess().setProcessGroupStatus(rootGroupStatus);
+ testedReportingTask.onTrigger(reportingContextStub);
+
+ List<Metric> collectedMetrics = testedReportingTask.getMetricsCollected();
+ TestVerification.assertJVMMetrics(collectedMetrics);
+ }
+
+ @Test
+ public void testAuthorization() throws IOException, InterruptedException, InitializationException {
+
+ reportingContextStub.setProperty(AzureLogAnalyticsReportingTask.SEND_JVM_METRICS.getName(), "true");
+ testedReportingTask.initialize(reportingInitContextStub);
+ reportingContextStub.getEventAccess().setProcessGroupStatus(rootGroupStatus);
+ testedReportingTask.onTrigger(reportingContextStub);
+
+ HttpPost postRequest = testedReportingTask.getPostRequest();
+ ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+ verify(postRequest, atLeast(1)).addHeader( eq("Authorization"), captor.capture());
+ assertTrue(captor.getValue().contains("SharedKey"));
+ }
+
+
+ private final class TestableAzureLogAnalyticsReportingTask extends AzureLogAnalyticsReportingTask {
+
+ private List<Metric> metricsCollected;
+ @Override
+ protected void sendMetrics(final HttpPost request, final String workspaceId, final String linuxPrimaryKey,
+ final List<Metric> allMetrics) throws IOException{
+
+ metricsCollected = allMetrics;
+ super.sendMetrics(request, workspaceId, linuxPrimaryKey, allMetrics);
+ }
+
+ public List<Metric> getMetricsCollected() {
+ return metricsCollected;
+ }
+
+ private HttpPost mockHttpPost;
+
+ @Override
+ protected HttpPost getHttpPost(final String urlFormat, final String workspaceId, final String logName) throws IllegalArgumentException {
+ mockHttpPost = Mockito.mock(HttpPost.class);
+ return mockHttpPost;
+ }
+ public HttpPost getPostRequest(){
+ return mockHttpPost;
+ }
+ @Override
+ protected void postRequest(final CloseableHttpClient httpClient, final HttpPost request)
+ throws IOException, RuntimeException {
+ // replace with mock httpclient and call base postRequest
+ CloseableHttpClient mockClient = Mockito.mock(CloseableHttpClient.class);
+ super.postRequest(mockClient, request);
+ }
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/test/java/org/apache/nifi/reporting/azure/loganalytics/TestMetricsFactory.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/test/java/org/apache/nifi/reporting/azure/loganalytics/TestMetricsFactory.java
new file mode 100644
index 0000000..724b6ce
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/test/java/org/apache/nifi/reporting/azure/loganalytics/TestMetricsFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.reporting.azure.loganalytics;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.gson.Gson;
+import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
+import org.apache.nifi.metrics.jvm.JvmMetrics;
+
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.reporting.azure.loganalytics.api.AzureLogAnalyticsMetricsFactory;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMetricsFactory {
+
+ private ProcessGroupStatus status;
+ private final Gson gson = new Gson();
+
+ @Before
+ public void init() {
+ status = new ProcessGroupStatus();
+ status.setId("1234");
+ status.setFlowFilesReceived(5);
+ status.setBytesReceived(10000);
+ status.setFlowFilesSent(10);
+ status.setBytesSent(20000);
+ status.setQueuedCount(100);
+ status.setQueuedContentSize(1024L);
+ status.setBytesRead(60000L);
+ status.setBytesWritten(80000L);
+ status.setActiveThreadCount(5);
+ }
+
+ @Test
+ public void testGetDataFlowMetrics() {
+ ProcessorStatus procStatus = new ProcessorStatus();
+ List<ProcessorStatus> processorStatuses = new ArrayList<>();
+ processorStatuses.add(procStatus);
+ status.setProcessorStatus(processorStatuses);
+
+ List<Metric> metrics = AzureLogAnalyticsMetricsFactory.getDataFlowMetrics(status, "testcase");
+ TestVerification.assertDatatFlowMetrics(metrics);
+ }
+
+ @Test
+ public void testGetVirtualMachineMetrics() {
+ JvmMetrics virtualMachineMetrics = JmxJvmMetrics.getInstance();
+ List<Metric> metrics = AzureLogAnalyticsMetricsFactory.getJvmMetrics(virtualMachineMetrics, "testcase", "tests");
+ String metricsInString = gson.toJson(metrics);
+ System.out.println(metricsInString);
+ TestVerification.assertJVMMetrics(metrics);
+ }
+
+ @Test
+ public void testToJsonWithLongValue() {
+ Metric metric = new Metric("instanceId", "groupId", "groupName");
+ metric.setCount(0x7ff8000000000000L);
+ gson.toJson(metric);
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/test/java/org/apache/nifi/reporting/azure/loganalytics/TestVerification.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/test/java/org/apache/nifi/reporting/azure/loganalytics/TestVerification.java
new file mode 100644
index 0000000..aa91e34
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/test/java/org/apache/nifi/reporting/azure/loganalytics/TestVerification.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.reporting.azure.loganalytics;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+public class TestVerification {
+
+ static public void assertDatatFlowMetrics(List<Metric> collectedMetrics) {
+ assertTrue( collectedMetrics.stream().anyMatch(
+ o -> o.getName().equals(MetricNames.FLOW_FILES_RECEIVED) && o.getCategoryName().equals("DataFlow")));
+ assertTrue( collectedMetrics.stream().anyMatch(
+ o -> o.getName().equals(MetricNames.BYTES_RECEIVED) && o.getCategoryName().equals("DataFlow")));
+ assertTrue( collectedMetrics.stream().anyMatch(
+ o -> o.getName().equals(MetricNames.FLOW_FILES_SENT) && o.getCategoryName().equals("DataFlow")));
+ assertTrue( collectedMetrics.stream().anyMatch(
+ o -> o.getName().equals(MetricNames.BYTES_SENT) && o.getCategoryName().equals("DataFlow")));
+ assertTrue( collectedMetrics.stream().anyMatch(
+ o -> o.getName().equals(MetricNames.FLOW_FILES_QUEUED) && o.getCategoryName().equals("DataFlow")));
+ assertTrue( collectedMetrics.stream().anyMatch(
+ o -> o.getName().equals(MetricNames.BYTES_QUEUED) && o.getCategoryName().equals("DataFlow")));
+ assertTrue( collectedMetrics.stream().anyMatch(
+ o -> o.getName().equals(MetricNames.BYTES_READ) && o.getCategoryName().equals("DataFlow")));
+ assertTrue( collectedMetrics.stream().anyMatch(
+ o -> o.getName().equals(MetricNames.BYTES_WRITTEN) && o.getCategoryName().equals("DataFlow")));
+ assertTrue( collectedMetrics.stream().anyMatch(
+ o -> o.getName().equals(MetricNames.ACTIVE_THREADS) && o.getCategoryName().equals("DataFlow")));
+ }
+
+ static public void assertJVMMetrics(List<Metric> collectedMetrics) {
+ assertTrue( collectedMetrics.stream().anyMatch(
+ o -> o.getName().equals(MetricNames.JVM_HEAP_USED) && o.getCategoryName().equals("JvmMetrics")));
+ assertTrue( collectedMetrics.stream().anyMatch(
+ o -> o.getName().equals(MetricNames.JVM_NON_HEAP_USAGE) && o.getCategoryName().equals("JvmMetrics")));
+ assertTrue( collectedMetrics.stream().anyMatch(
+ o -> o.getName().equals(MetricNames.JVM_THREAD_COUNT) && o.getCategoryName().equals("JvmMetrics")));
+ assertTrue( collectedMetrics.stream().anyMatch(
+ o -> o.getName().equals(MetricNames.JVM_FILE_DESCRIPTOR_USAGE) && o.getCategoryName().equals("JvmMetrics")));
+ assertTrue( collectedMetrics.stream().anyMatch(
+ o -> o.getName().equals(MetricNames.JVM_DAEMON_THREAD_COUNT) && o.getCategoryName().equals("JvmMetrics")));
+ assertTrue( collectedMetrics.stream().anyMatch(
+ o -> o.getName().equals(MetricNames.JVM_THREAD_STATES_BLOCKED) && o.getCategoryName().equals("JvmMetrics")));
+ assertTrue( collectedMetrics.stream().anyMatch(
+ o -> o.getName().equals(MetricNames.JVM_UPTIME) && o.getCategoryName().equals("JvmMetrics")));
+ assertTrue( collectedMetrics.stream().anyMatch(
+ o -> o.getName().equals(MetricNames.JVM_HEAP_USAGE) && o.getCategoryName().equals("JvmMetrics")));
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/pom.xml
index 4768a4b..bdca8b4 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/pom.xml
@@ -31,6 +31,7 @@
<modules>
<module>nifi-azure-processors</module>
+ <module>nifi-azure-reporting-task</module>
<module>nifi-azure-nar</module>
<module>nifi-azure-services-api</module>
<module>nifi-azure-services-api-nar</module>