You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/04/23 16:19:02 UTC

[jira] [Commented] (NIFI-1705) AttributesToCSV

    [ https://issues.apache.org/jira/browse/NIFI-1705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16448380#comment-16448380 ] 

ASF GitHub Bot commented on NIFI-1705:
--------------------------------------

Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1589#discussion_r183453942
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToCSV.java ---
    @@ -0,0 +1,280 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.commons.lang3.StringEscapeUtils;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +
    +import java.io.BufferedOutputStream;
    +import java.io.OutputStream;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.ArrayList;
    +import java.util.Set;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Collections;
    +import java.util.stream.Collectors;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@Tags({"csv", "attributes", "flowfile"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Generates a CSV representation of the input FlowFile Attributes. The resulting CSV " +
    +        "can be written to either a newly generated attribute named 'CSVAttributes' or written to the FlowFile as content.  " +
    +        "If the attribute value contains a comma, newline or double quote, then the attribute value will be " +
    +        "escaped with double quotes.  Any double quote characters in the attribute value are escaped with " +
    +        "another double quote.  If the attribute value does not contain a comma, newline or double quote, then the " +
    +        "attribute value is returned unchanged.")
    +@WritesAttribute(attribute = "CSVAttributes", description = "CSV representation of Attributes")
    +public class AttributesToCSV extends AbstractProcessor {
    +
    +    private static final String OUTPUT_NEW_ATTRIBUTE = "flowfile-attribute";
    +    private static final String OUTPUT_OVERWRITE_CONTENT = "flowfile-content";
    +    private static final String OUTPUT_ATTRIBUTE_NAME = "CSVAttributes";
    +    private static final String OUTPUT_SEPARATOR = ",";
    +    private static final String OUTPUT_MIME_TYPE = "text/csv";
    +
    +
    +    public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder()
    +            .name("attribute-list")
    +            .displayName("Attribute List")
    +            .description("Comma separated list of attributes to be included in the resulting CSV. If this value " +
    +                    "is left empty then all existing Attributes will be included. This list of attributes is " +
    +                    "case sensitive. If an attribute specified in the list is not found it will be emitted " +
    +                    "to the resulting CSV with an empty string or null depending on the 'Null Value' property. " +
    +                    "If a core attribute is specified in this list " +
    +                    "and the 'Include Core Attributes' property is false, the core attribute will be included. The attribute list " +
    +                    "ALWAYS wins.")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
    +            .name("destination")
    +            .displayName("Destination")
    +            .description("Control if CSV value is written as a new flowfile attribute 'CSVAttributes' " +
    +                    "or written in the flowfile content. Writing to flowfile content will overwrite any " +
    +                    "existing flowfile content.")
    +            .required(true)
    +            .allowableValues(OUTPUT_NEW_ATTRIBUTE, OUTPUT_OVERWRITE_CONTENT)
    +            .defaultValue(OUTPUT_NEW_ATTRIBUTE)
    +            .build();
    +
    +    public static final PropertyDescriptor INCLUDE_CORE_ATTRIBUTES = new PropertyDescriptor.Builder()
    +            .name("include-core-userSpecifiedAttributes")
    +            .displayName("Include Core Attributes")
    +            .description("Determines if the FlowFile org.apache.nifi.flowfile.attributes.CoreAttributes, which are " +
    +                    "contained in every FlowFile, should be included in the final CSV value generated.  The Attribute List property " +
    +                    "overrides this setting.")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor NULL_VALUE_FOR_EMPTY_STRING = new PropertyDescriptor.Builder()
    +            .name(("null-value"))
    +            .displayName("Null Value")
    +            .description("If true a non existing or empty attribute will be 'null' in the resulting CSV. If false an empty " +
    +                    "string will be placed in the CSV")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .build();
    +
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +            .description("Successfully converted attributes to CSV").build();
    +    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +            .description("Failed to convert attributes to CSV").build();
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +    private volatile Set<String> userSpecifiedAttributes;
    +    private volatile Boolean includeCoreAttributes;
    +    private volatile Set<String> coreAttributes;
    +    private volatile boolean destinationContent;
    +    private volatile boolean nullValForEmptyString;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(ATTRIBUTES_LIST);
    +        properties.add(DESTINATION);
    +        properties.add(INCLUDE_CORE_ATTRIBUTES);
    +        properties.add(NULL_VALUE_FOR_EMPTY_STRING);
    +        this.properties = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +
    +    private Map<String, String> buildAttributesMapForFlowFile(FlowFile ff, Set<String> userSpecifiedAttributes) {
    +        Map<String, String> result;
    +        if (!userSpecifiedAttributes.isEmpty()) {
    +            //the user gave a list of attributes
    +            result = new HashMap<>(userSpecifiedAttributes.size());
    +            for (String attribute : userSpecifiedAttributes) {
    +                String val = ff.getAttribute(attribute);
    +                if (val != null && !val.isEmpty()) {
    +                    result.put(attribute, val);
    +                } else {
    +                    if (nullValForEmptyString) {
    +                        result.put(attribute, "null");
    +                    } else {
    +                        result.put(attribute, "");
    +                    }
    +                }
    +            }
    +        } else {
    +            //the user did not give a list of attributes, take all the attributes from the flowfile
    +            Map<String, String> ffAttributes = ff.getAttributes();
    +            result = new HashMap<>(ffAttributes.size());
    +            for (Map.Entry<String, String> e : ffAttributes.entrySet()) {
    +                    result.put(e.getKey(), e.getValue());
    +            }
    +        }
    +
    +        //now glue on the core attributes if the user wants them.
    +        if(includeCoreAttributes) {
    +            for (String coreAttribute : coreAttributes) {
    +                String val = ff.getAttribute(coreAttribute);
    +                //make sure this coreAttribute is applicable to this flowfile.
    +                if(ff.getAttributes().containsKey(coreAttribute)) {
    +                    if (val != null && !val.isEmpty()) {
    +                        result.put(coreAttribute, val);
    +                    } else {
    +                        if (nullValForEmptyString) {
    +                            result.put(coreAttribute, "null");
    +                        } else {
    +                            result.put(coreAttribute, "");
    +                        }
    +                    }
    +                }
    +            }
    +        } else {
    +            //remove core attributes since the user does not want them, unless they are in the attribute list. Attribute List always wins
    +            for (String coreAttribute : coreAttributes) {
    +                //never override user specified attributes, even if the user has selected to exclude core attributes
    +                if(!userSpecifiedAttributes.contains(coreAttribute)) {
    +                    result.remove(coreAttribute);
    +                }
    +            }
    +        }
    +        return result;
    +    }
    +
    +    private Set<String> attributeListStringToSet(String attributeList) {
    +        //take the user specified attribute list string and convert to list of strings.
    +        Set<String> result = new HashSet<>();
    +        if (StringUtils.isNotBlank(attributeList)) {
    +            String[] ats = StringUtils.split(attributeList, OUTPUT_SEPARATOR);
    +            if (ats != null) {
    +                for (String str : ats) {
    +                    String trim = str.trim();
    +                        result.add(trim);
    +                }
    +            }
    +        }
    +        return result;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) {
    +        includeCoreAttributes = context.getProperty(INCLUDE_CORE_ATTRIBUTES).asBoolean();
    +        coreAttributes = Arrays.stream(CoreAttributes.values()).map(CoreAttributes::key).collect(Collectors.toSet());
    +        userSpecifiedAttributes = attributeListStringToSet(context.getProperty(ATTRIBUTES_LIST).getValue());
    +        destinationContent = OUTPUT_OVERWRITE_CONTENT.equals(context.getProperty(DESTINATION).getValue());
    +        nullValForEmptyString = context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean();
    +     }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        final FlowFile original = session.get();
    +        if (original == null) {
    +            return;
    +        }
    +
    +        final Map<String, String> atrList = buildAttributesMapForFlowFile(original, userSpecifiedAttributes);
    +
    +        //escape attribute values
    +        final StringBuilder sb = new StringBuilder();
    +        for (final String val : atrList.values()) {
    +            sb.append(StringEscapeUtils.escapeCsv(val));
    +            sb.append(OUTPUT_SEPARATOR);
    +        }
    +
    +        //check if the output separator is at the end of the string, if so then remove it
    +        if(sb.length() > 0 && sb.lastIndexOf(OUTPUT_SEPARATOR) == sb.length() -1) {
    --- End diff --
    
    I think you could eliminate this by adding `int index = 0;` after the declaration of `sb` and then doing:
    
    `sb.append(index++ < atrList.values().size() -1 ? OUTPUT_SEPARATOR : "")`


> AttributesToCSV
> ---------------
>
>                 Key: NIFI-1705
>                 URL: https://issues.apache.org/jira/browse/NIFI-1705
>             Project: Apache NiFi
>          Issue Type: Sub-task
>          Components: Extensions
>            Reporter: Randy Gelhausen
>            Priority: Major
>
> Create a new processor which converts a Flowfile's attributes into CSV content.
> Should support the same configuration options as the AttributesToJSON processor



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)