You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2019/12/17 23:53:15 UTC

[GitHub] [nifi] bahlulh commented on a change in pull request #3934: NIFI-6942: Add a reporting task to push provenance data to azure log analytics.

bahlulh commented on a change in pull request #3934: NIFI-6942: Add a reporting task to push provenance data to azure log analytics.
URL: https://github.com/apache/nifi/pull/3934#discussion_r359090100
 
 

 ##########
 File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AzureLogAnalyticsProvenanceReportingTask.java
 ##########
 @@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.azure.loganalytics;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.text.DateFormat;
+import java.text.MessageFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.UUID;
+
+import javax.json.Json;
+import javax.json.JsonArrayBuilder;
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+import javax.json.JsonObjectBuilder;
+import javax.json.JsonValue;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
+
+@Tags({ "azure", "provenace", "reporting", "log analytics" })
+@CapabilityDescription("Publishes Provenance events to to a Azure Log Analytics workspace.")
+public class AzureLogAnalyticsProvenanceReportingTask extends AbstractAzureLogAnalyticsReportingTask {
+
+        protected static final String LAST_EVENT_ID_KEY = "last_event_id";
+        protected static final String DESTINATION_URL_PATH = "/nifi";
+        protected static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+
+        static final AllowableValue BEGINNING_OF_STREAM = new AllowableValue("beginning-of-stream",
+                        "Beginning of Stream",
+                        "Start reading provenance Events from the beginning of the stream (the oldest event first)");
+
+        static final AllowableValue END_OF_STREAM = new AllowableValue("end-of-stream", "End of Stream",
+                        "Start reading provenance Events from the end of the stream, ignoring old events");
+
+        static final PropertyDescriptor FILTER_EVENT_TYPE = new PropertyDescriptor.Builder()
+                        .name("s2s-prov-task-event-filter").displayName("Event Type to Include")
+                        .description("Comma-separated list of event types that will be used to filter the provenance events sent by the reporting task. "
+                                        + "Available event types are "
+                                        + Arrays.deepToString(ProvenanceEventType.values())
+                                        + ". If no filter is set, all the events are sent. If "
+                                        + "multiple filters are set, the filters are cumulative.")
+                        .required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+                        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+        static final PropertyDescriptor FILTER_EVENT_TYPE_EXCLUDE = new PropertyDescriptor.Builder()
+                        .name("s2s-prov-task-event-filter-exclude").displayName("Event Type to Exclude")
+                        .description("Comma-separated list of event types that will be used to exclude the provenance events sent by the reporting task. "
+                                        + "Available event types are "
+                                        + Arrays.deepToString(ProvenanceEventType.values())
+                                        + ". If no filter is set, all the events are sent. If "
+                                        + "multiple filters are set, the filters are cumulative. If an event type is included in Event Type to Include and excluded here, then the "
+                                        + "exclusion takes precedence and the event will not be sent.")
+                        .required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+                        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+        static final PropertyDescriptor FILTER_COMPONENT_TYPE = new PropertyDescriptor.Builder()
+                        .name("s2s-prov-task-type-filter").displayName("Component Type to Include")
+                        .description("Regular expression to filter the provenance events based on the component type. Only the events matching the regular "
+                                        + "expression will be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.")
+                        .required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+                        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
+
+        static final PropertyDescriptor FILTER_COMPONENT_TYPE_EXCLUDE = new PropertyDescriptor.Builder()
+                        .name("s2s-prov-task-type-filter-exclude").displayName("Component Type to Exclude")
+                        .description("Regular expression to exclude the provenance events based on the component type. The events matching the regular "
+                                        + "expression will not be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. "
+                                        + "If a component type is included in Component Type to Include and excluded here, then the exclusion takes precedence and the event will not be sent.")
+                        .required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+                        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
+
+        static final PropertyDescriptor FILTER_COMPONENT_ID = new PropertyDescriptor.Builder()
+                        .name("s2s-prov-task-id-filter").displayName("Component ID to Include")
+                        .description("Comma-separated list of component UUID that will be used to filter the provenance events sent by the reporting task. If no "
+                                        + "filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.")
+                        .required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+                        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+        static final PropertyDescriptor FILTER_COMPONENT_ID_EXCLUDE = new PropertyDescriptor.Builder()
+                        .name("s2s-prov-task-id-filter-exclude").displayName("Component ID to Exclude")
+                        .description("Comma-separated list of component UUID that will be used to exclude the provenance events sent by the reporting task. If no "
+                                        + "filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. If a component UUID is included in "
+                                        + "Component ID to Include and excluded here, then the exclusion takes precedence and the event will not be sent.")
+                        .required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+                        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+        static final PropertyDescriptor FILTER_COMPONENT_NAME = new PropertyDescriptor.Builder()
+                        .name("s2s-prov-task-name-filter").displayName("Component Name to Include")
+                        .description("Regular expression to filter the provenance events based on the component name. Only the events matching the regular "
+                                        + "expression will be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.")
+                        .required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+                        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
+
+        static final PropertyDescriptor FILTER_COMPONENT_NAME_EXCLUDE = new PropertyDescriptor.Builder()
+                        .name("s2s-prov-task-name-filter-exclude").displayName("Component Name to Exclude")
+                        .description("Regular expression to exclude the provenance events based on the component name. The events matching the regular "
+                                        + "expression will not be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. "
+                                        + "If a component name is included in Component Name to Include and excluded here, then the exclusion takes precedence and the event will not be sent.")
+                        .required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+                        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
+
+        static final PropertyDescriptor START_POSITION = new PropertyDescriptor.Builder().name("start-position")
+                        .displayName("Start Position")
+                        .description("If the Reporting Task has never been run, or if its state has been reset by a user, "
+                                        + "specifies where in the stream of Provenance Events the Reporting Task should start")
+                        .allowableValues(BEGINNING_OF_STREAM, END_OF_STREAM)
+                        .defaultValue(BEGINNING_OF_STREAM.getValue()).required(true).build();
+
+        static final PropertyDescriptor ALLOW_NULL_VALUES = new PropertyDescriptor.Builder().name("include-null-values")
+                        .displayName("Include Null Values")
+                        .description("Indicate if null values should be included in records. Default will be false")
+                        .required(true).allowableValues("true", "false").defaultValue("false").build();
+
+        static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder().name("Platform")
+                        .description("The value to use for the platform field in each event.").required(true)
+                        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).defaultValue("nifi")
+                        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+        static final PropertyDescriptor INSTANCE_URL = new PropertyDescriptor.Builder().name("Instance URL")
+                        .displayName("Instance URL")
+                        .description("The URL of this instance to use in the Content URI of each event.").required(true)
+                        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+                        .defaultValue("http://${hostname(true)}:8080/nifi")
+                        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+        static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("Batch Size")
+                        .displayName("Batch Size")
+                        .description("Specifies how many records to send in a single batch, at most.").required(true)
+                        .defaultValue("1000").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
+
+        private ConfigurationContext context;
+
+        private volatile ProvenanceEventConsumer consumer;
+
+        @Override
+        protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+                final List<PropertyDescriptor> properties = new ArrayList<>();
+                properties.add(LOG_ANALYTICS_WORKSPACE_ID);
+                properties.add(LOG_ANALYTICS_CUSTOM_LOG_NAME);
+                properties.add(LOG_ANALYTICS_WORKSPACE_KEY);
+                properties.add(APPLICATION_ID);
+                properties.add(INSTANCE_ID);
+                properties.add(JOB_NAME);
+                properties.add(LOG_ANALYTICS_URL_ENDPOINT_FORMAT);
+                properties.add(FILTER_EVENT_TYPE);
+                properties.add(FILTER_EVENT_TYPE_EXCLUDE);
+                properties.add(FILTER_COMPONENT_TYPE);
+                properties.add(FILTER_COMPONENT_TYPE_EXCLUDE);
+                properties.add(FILTER_COMPONENT_ID);
+                properties.add(FILTER_COMPONENT_ID_EXCLUDE);
+                properties.add(FILTER_COMPONENT_NAME);
+                properties.add(FILTER_COMPONENT_NAME_EXCLUDE);
+                properties.add(START_POSITION);
+                properties.add(ALLOW_NULL_VALUES);
+                properties.add(PLATFORM);
+                properties.add(INSTANCE_URL);
+                properties.add(BATCH_SIZE);
+                return properties;
+        }
+
+        public void CreateConsumer(final ReportingContext context) {
+                if (consumer != null)
+                        return;
+                consumer = new ProvenanceEventConsumer();
+                consumer.setStartPositionValue(context.getProperty(START_POSITION).getValue());
+                consumer.setBatchSize(context.getProperty(BATCH_SIZE).asInteger());
+                consumer.setLogger(getLogger());
+                // initialize component type filtering
+                consumer.setComponentTypeRegex(
+                                context.getProperty(FILTER_COMPONENT_TYPE).evaluateAttributeExpressions().getValue());
+                consumer.setComponentTypeRegexExclude(context.getProperty(FILTER_COMPONENT_TYPE_EXCLUDE)
+                                .evaluateAttributeExpressions().getValue());
+                consumer.setComponentNameRegex(
+                                context.getProperty(FILTER_COMPONENT_NAME).evaluateAttributeExpressions().getValue());
+                consumer.setComponentNameRegexExclude(context.getProperty(FILTER_COMPONENT_NAME_EXCLUDE)
+                                .evaluateAttributeExpressions().getValue());
+
+                final String[] targetEventTypes = StringUtils.stripAll(StringUtils.split(
+                                context.getProperty(FILTER_EVENT_TYPE).evaluateAttributeExpressions().getValue(), ','));
+                if (targetEventTypes != null) {
+                        for (final String type : targetEventTypes) {
+                                try {
+                                        consumer.addTargetEventType(ProvenanceEventType.valueOf(type));
+                                } catch (final Exception e) {
+                                        getLogger().warn(type
+                                                        + " is not a correct event type, removed from the filtering.");
+                                }
+                        }
+                }
+
+                final String[] targetEventTypesExclude = StringUtils
+                                .stripAll(StringUtils.split(context.getProperty(FILTER_EVENT_TYPE_EXCLUDE)
+                                                .evaluateAttributeExpressions().getValue(), ','));
+                if (targetEventTypesExclude != null) {
+                        for (final String type : targetEventTypesExclude) {
+                                try {
+                                        consumer.addTargetEventTypeExclude(ProvenanceEventType.valueOf(type));
+                                } catch (final Exception e) {
+                                        getLogger().warn(type
+                                                        + " is not a correct event type, removed from the exclude filtering.");
+                                }
+                        }
+                }
+
+                // initialize component ID filtering
+                final String[] targetComponentIds = StringUtils.stripAll(StringUtils.split(
+                                context.getProperty(FILTER_COMPONENT_ID).evaluateAttributeExpressions().getValue(),
+                                ','));
+                if (targetComponentIds != null) {
+                        consumer.addTargetComponentId(targetComponentIds);
+                }
+
+                final String[] targetComponentIdsExclude = StringUtils
+                                .stripAll(StringUtils.split(context.getProperty(FILTER_COMPONENT_ID_EXCLUDE)
+                                                .evaluateAttributeExpressions().getValue(), ','));
+                if (targetComponentIdsExclude != null) {
+                        consumer.addTargetComponentIdExclude(targetComponentIdsExclude);
+                }
+
+                consumer.setScheduled(true);
+        }
+
+        @Override
+        public void onTrigger(ReportingContext context) {
+                final boolean isClustered = context.isClustered();
+                final String nodeId = context.getClusterNodeIdentifier();
+                if (nodeId == null && isClustered) {
+                        getLogger().debug(
+                                        "This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. "
+                                                        + "Will wait for Node Identifier to be established.");
+                        return;
+                }
+
+                final String workspaceId = context.getProperty(LOG_ANALYTICS_WORKSPACE_ID)
+                                .evaluateAttributeExpressions().getValue();
+                final String linuxPrimaryKey = context.getProperty(LOG_ANALYTICS_WORKSPACE_KEY)
+                                .evaluateAttributeExpressions().getValue();
+                final String logName = context.getProperty(LOG_ANALYTICS_CUSTOM_LOG_NAME).evaluateAttributeExpressions()
+                                .getValue();
+                final String urlEndpointFormat = context.getProperty(LOG_ANALYTICS_URL_ENDPOINT_FORMAT)
+                                .evaluateAttributeExpressions().getValue();
+                try {
+                        final String dataCollectorEndpoint = MessageFormat.format(urlEndpointFormat, workspaceId);
+                        final HttpPost httpPost = new HttpPost(dataCollectorEndpoint);
+                        httpPost.addHeader("Content-Type", "application/json");
+                        httpPost.addHeader("Log-Type", logName);
+                        String str = GetStringData(context);
+                        sendToLogAnalytics(httpPost, workspaceId, linuxPrimaryKey, str);
+                } catch (final Exception e) {
+                        getLogger().error("Failed to publish metrics to Azure Log Analytics", e);
+                }
+        }
+
+        public String GetStringData(final ReportingContext context) {
+                final ProcessGroupStatus procGroupStatus = context.getEventAccess().getControllerStatus();
+                final String nodeId = context.getClusterNodeIdentifier();
+                final String rootGroupName = procGroupStatus == null ? null : procGroupStatus.getName();
+                final String platform = context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue();
+                final Boolean allowNullValues = context.getProperty(ALLOW_NULL_VALUES).asBoolean();
+                final String nifiUrl = context.getProperty(INSTANCE_URL).evaluateAttributeExpressions().getValue();
+                URL url;
+                try {
+                        url = new URL(nifiUrl);
+                } catch (final MalformedURLException e1) {
+                        throw new AssertionError();
+                }
+
+                final String hostname = url.getHost();
+
+                final Map<String, Object> config = Collections.emptyMap();
+                final JsonBuilderFactory factory = Json.createBuilderFactory(config);
+                final JsonObjectBuilder builder = factory.createObjectBuilder();
+                final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
+                df.setTimeZone(TimeZone.getTimeZone("Z"));
+                StringBuilder sb = new StringBuilder();
+                sb.append('[');
+                CreateConsumer(context);
+                consumer.consumeEvents(context, (mapHolder, events) -> {
+                        for (final ProvenanceEventRecord event : events) {
+                                final String componentName = mapHolder.getComponentName(event.getComponentId());
+                                final String processGroupId = mapHolder.getProcessGroupId(event.getComponentId(),
+                                                event.getComponentType());
+                                final String processGroupName = mapHolder.getComponentName(processGroupId);
+                                JsonObject jo = serialize(factory, builder, event, df, componentName, processGroupId,
+                                                processGroupName, hostname, url, rootGroupName, platform, nodeId,
+                                                allowNullValues);
+                                sb.append(jo.toString());
+                                sb.append(',');
+                        }
+
+                });
+                sb.append(']');
+                return sb.toString();
+        }
+
+        private JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder,
+                        final ProvenanceEventRecord event, final DateFormat df, final String componentName,
+                        final String processGroupId, final String processGroupName, final String hostname,
+                        final URL nifiUrl, final String applicationName, final String platform,
+                        final String nodeIdentifier, Boolean allowNullValues) {
+                addField(builder, "eventId", UUID.randomUUID().toString(), allowNullValues);
+                addField(builder, "eventOrdinal", event.getEventId(), allowNullValues);
 
 Review comment:
   Ok. I will use event.GetEventId as eventId and remove eventOrdinal.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services