You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2022/03/04 13:31:38 UTC

[nifi] branch main updated: NIFI-9748: Added new property for Output Format to LogAttribute. Also made the FlowFile Properties (file size, entry date, lineage start date) optional and renamed from 'Standard FlowFile Attributes' to 'FlowFile Properties' because this has led to confusion many times in the past, around users wanting to reference these things as attributes via EL but they are not actually attributes.

This is an automated email from the ASF dual-hosted git repository.

mthomsen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 65dd627  NIFI-9748: Added new property for Output Format to LogAttribute. Also made the FlowFile Properties (file size, entry date, lineage start date) optional and renamed from 'Standard FlowFile Attributes' to 'FlowFile Properties' because this has led to confusion many times in the past, around users wanting to reference these things as attributes via EL but they are not actually attributes.
65dd627 is described below

commit 65dd62716a62164f44be85657283103885f0f3ae
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Wed Mar 2 13:18:28 2022 -0500

    NIFI-9748: Added new property for Output Format to LogAttribute. Also made the FlowFile Properties (file size, entry date, lineage start date) optional and renamed from 'Standard FlowFile Attributes' to 'FlowFile Properties' because this has led to confusion many times in the past, around users wanting to reference these things as attributes via EL but they are not actually attributes.
    
    This closes #5825
    
    Signed-off-by: Mike Thomsen <mt...@apache.org>
---
 .../nifi/processors/standard/LogAttribute.java     | 112 +++++++++++++++------
 1 file changed, 80 insertions(+), 32 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LogAttribute.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LogAttribute.java
index fb86305..c5773dc 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LogAttribute.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LogAttribute.java
@@ -16,20 +16,6 @@
  */
 package org.apache.nifi.processors.standard;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.EventDriven;
@@ -39,6 +25,7 @@ import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
@@ -53,6 +40,22 @@ import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.eclipse.jetty.util.StringUtil;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
 @EventDriven
 @SideEffectFree
 @SupportsBatching
@@ -60,6 +63,10 @@ import org.eclipse.jetty.util.StringUtil;
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @CapabilityDescription("Emits attributes of the FlowFile at the specified log level")
 public class LogAttribute extends AbstractProcessor {
+    private static final AllowableValue OUTPUT_FORMAT_LINE_PER_ATTRIBUTE = new AllowableValue("Line per Attribute", "Line per Attribute", "Each FlowFile attribute will be logged using a single line" +
+        " for the attribute name and another line for the attribute value. This format is often most advantageous when looking at the attributes of a single FlowFile.");
+    private static final AllowableValue OUTPUT_FORMAT_SINGLE_LINE = new AllowableValue("Single Line", "Single Line", "All FlowFile attribute names and values will be logged on a single line. This " +
+        "format is often most advantageous when comparing logs from multiple FlowFiles.");
 
     public static final PropertyDescriptor LOG_LEVEL = new PropertyDescriptor.Builder()
             .name("Log Level")
@@ -99,6 +106,14 @@ public class LogAttribute extends AbstractProcessor {
                     " There's an OR relationship between the two properties.")
             .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
             .build();
+    public static final PropertyDescriptor OUTPUT_FORMAT = new PropertyDescriptor.Builder()
+            .name("Output Format")
+            .displayName("Output Format")
+            .description("Specifies the format to use for logging FlowFile attributes")
+            .required(true)
+            .allowableValues(OUTPUT_FORMAT_LINE_PER_ATTRIBUTE, OUTPUT_FORMAT_SINGLE_LINE)
+            .defaultValue(OUTPUT_FORMAT_LINE_PER_ATTRIBUTE.getValue())
+            .build();
     public static final PropertyDescriptor LOG_PAYLOAD = new PropertyDescriptor.Builder()
             .name("Log Payload")
             .required(true)
@@ -106,7 +121,14 @@ public class LogAttribute extends AbstractProcessor {
             .defaultValue("false")
             .allowableValues("true", "false")
             .build();
-
+    static final PropertyDescriptor LOG_FLOWFILE_PROPERTIES = new PropertyDescriptor.Builder()
+            .name("Log FlowFile Properties")
+            .displayName("Log FlowFile Properties")
+            .description("Specifies whether or not to log FlowFile \"properties\", such as Entry Date, Lineage Start Date, and content size")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
     public static final PropertyDescriptor LOG_PREFIX = new PropertyDescriptor.Builder()
             .name("Log prefix")
             .required(false)
@@ -154,6 +176,8 @@ public class LogAttribute extends AbstractProcessor {
         supDescriptors.add(ATTRIBUTES_TO_LOG_REGEX);
         supDescriptors.add(ATTRIBUTES_TO_IGNORE_CSV);
         supDescriptors.add(ATTRIBUTES_TO_IGNORE_REGEX);
+        supDescriptors.add(LOG_FLOWFILE_PROPERTIES);
+        supDescriptors.add(OUTPUT_FORMAT);
         supDescriptors.add(LOG_PREFIX);
         supDescriptors.add(CHARSET);
         supportedDescriptors = Collections.unmodifiableList(supDescriptors);
@@ -171,7 +195,6 @@ public class LogAttribute extends AbstractProcessor {
 
     protected String processFlowFile(final ComponentLog logger, final DebugLevels logLevel, final FlowFile flowFile, final ProcessSession session, final ProcessContext context) {
         final Set<String> attributeKeys = getAttributesToLog(flowFile.getAttributes().keySet(), context);
-        final ComponentLog LOG = getLogger();
         final String dashedLine;
 
         String logPrefix = context.getProperty(LOG_PREFIX).evaluateAttributeExpressions(flowFile).getValue();
@@ -188,21 +211,44 @@ public class LogAttribute extends AbstractProcessor {
             dashedLine = StringUtils.repeat('-', 5) + logPrefix + StringUtils.repeat('-', 5);
         }
 
+        final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue();
+        final boolean logProperties = context.getProperty(LOG_FLOWFILE_PROPERTIES).asBoolean();
+
         // Pretty print metadata
         final StringBuilder message = new StringBuilder();
         message.append("logging for flow file ").append(flowFile);
         message.append("\n");
-        message.append(dashedLine);
-        message.append("\nStandard FlowFile Attributes");
-        message.append(String.format("\nKey: '%1$s'\n\tValue: '%2$s'", "entryDate", new Date(flowFile.getEntryDate())));
-        message.append(String.format("\nKey: '%1$s'\n\tValue: '%2$s'", "lineageStartDate", new Date(flowFile.getLineageStartDate())));
-        message.append(String.format("\nKey: '%1$s'\n\tValue: '%2$s'", "fileSize", flowFile.getSize()));
-        message.append("\nFlowFile Attribute Map Content");
-        for (final String key : attributeKeys) {
-            message.append(String.format("\nKey: '%1$s'\n\tValue: '%2$s'", key, flowFile.getAttribute(key)));
+
+        if (OUTPUT_FORMAT_LINE_PER_ATTRIBUTE.getValue().equalsIgnoreCase(outputFormat)) {
+            message.append(dashedLine);
+
+            if (logProperties) {
+                message.append("\nFlowFile Properties");
+                message.append(String.format("\nKey: '%1$s'\n\tValue: '%2$s'", "entryDate", new Date(flowFile.getEntryDate())));
+                message.append(String.format("\nKey: '%1$s'\n\tValue: '%2$s'", "lineageStartDate", new Date(flowFile.getLineageStartDate())));
+                message.append(String.format("\nKey: '%1$s'\n\tValue: '%2$s'", "fileSize", flowFile.getSize()));
+            }
+
+            message.append("\nFlowFile Attribute Map Content");
+            for (final String key : attributeKeys) {
+                message.append(String.format("\nKey: '%1$s'\n\tValue: '%2$s'", key, flowFile.getAttribute(key)));
+            }
+            message.append("\n");
+            message.append(dashedLine);
+        } else {
+            final Map<String, String> attributes = new TreeMap<>();
+            for (final String key : attributeKeys) {
+                attributes.put(key, flowFile.getAttribute(key));
+            }
+
+            if (logProperties) {
+                attributes.put("entryDate", new Date(flowFile.getEntryDate()).toString());
+                attributes.put("lineageStartDate", new Date(flowFile.getLineageStartDate()).toString());
+                attributes.put("fileSize", String.valueOf(flowFile.getSize()));
+            }
+
+            message.append("FlowFile Properties: ").append(attributes);
         }
-        message.append("\n");
-        message.append(dashedLine);
 
         // The user can request to log the payload
         final boolean logPayload = context.getProperty(LOG_PAYLOAD).asBoolean();
@@ -220,22 +266,23 @@ public class LogAttribute extends AbstractProcessor {
         // Uses optional property to specify logging level
         switch (logLevel) {
             case info:
-                LOG.info(outputMessage);
+                logger.info(outputMessage);
                 break;
             case debug:
-                LOG.debug(outputMessage);
+                logger.debug(outputMessage);
                 break;
             case warn:
-                LOG.warn(outputMessage);
+                logger.warn(outputMessage);
                 break;
             case trace:
-                LOG.trace(outputMessage);
+                logger.trace(outputMessage);
                 break;
             case error:
-                LOG.error(outputMessage);
+                logger.error(outputMessage);
                 break;
             default:
-                LOG.debug(outputMessage);
+                logger.debug(outputMessage);
+                break;
         }
 
         return outputMessage;
@@ -337,4 +384,5 @@ public class LogAttribute extends AbstractProcessor {
         }
     }
 
+
 }