You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/04/23 16:19:02 UTC
[jira] [Commented] (NIFI-1705) AttributesToCSV
[ https://issues.apache.org/jira/browse/NIFI-1705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16448380#comment-16448380 ]
ASF GitHub Bot commented on NIFI-1705:
--------------------------------------
Github user MikeThomsen commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1589#discussion_r183453942
--- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToCSV.java ---
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+
+import java.io.BufferedOutputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Collections;
+import java.util.stream.Collectors;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"csv", "attributes", "flowfile"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Generates a CSV representation of the input FlowFile Attributes. The resulting CSV " +
+ "can be written to either a newly generated attribute named 'CSVAttributes' or written to the FlowFile as content. " +
+ "If the attribute value contains a comma, newline or double quote, then the attribute value will be " +
+ "escaped with double quotes. Any double quote characters in the attribute value are escaped with " +
+ "another double quote. If the attribute value does not contain a comma, newline or double quote, then the " +
+ "attribute value is returned unchanged.")
+@WritesAttribute(attribute = "CSVAttributes", description = "CSV representation of Attributes")
+public class AttributesToCSV extends AbstractProcessor {
+
+ private static final String OUTPUT_NEW_ATTRIBUTE = "flowfile-attribute";
+ private static final String OUTPUT_OVERWRITE_CONTENT = "flowfile-content";
+ private static final String OUTPUT_ATTRIBUTE_NAME = "CSVAttributes";
+ private static final String OUTPUT_SEPARATOR = ",";
+ private static final String OUTPUT_MIME_TYPE = "text/csv";
+
+
+ public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder()
+ .name("attribute-list")
+ .displayName("Attribute List")
+ .description("Comma separated list of attributes to be included in the resulting CSV. If this value " +
+ "is left empty then all existing Attributes will be included. This list of attributes is " +
+ "case sensitive. If an attribute specified in the list is not found it will be emitted " +
+ "to the resulting CSV with an empty string or null depending on the 'Null Value' property. " +
+ "If a core attribute is specified in this list " +
+ "and the 'Include Core Attributes' property is false, the core attribute will be included. The attribute list " +
+ "ALWAYS wins.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
+ .name("destination")
+ .displayName("Destination")
+ .description("Control if CSV value is written as a new flowfile attribute 'CSVAttributes' " +
+ "or written in the flowfile content. Writing to flowfile content will overwrite any " +
+ "existing flowfile content.")
+ .required(true)
+ .allowableValues(OUTPUT_NEW_ATTRIBUTE, OUTPUT_OVERWRITE_CONTENT)
+ .defaultValue(OUTPUT_NEW_ATTRIBUTE)
+ .build();
+
+ public static final PropertyDescriptor INCLUDE_CORE_ATTRIBUTES = new PropertyDescriptor.Builder()
+ .name("include-core-userSpecifiedAttributes")
+ .displayName("Include Core Attributes")
+ .description("Determines if the FlowFile org.apache.nifi.flowfile.attributes.CoreAttributes, which are " +
+ "contained in every FlowFile, should be included in the final CSV value generated. The Attribute List property " +
+ "overrides this setting.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
+
+ public static final PropertyDescriptor NULL_VALUE_FOR_EMPTY_STRING = new PropertyDescriptor.Builder()
+ .name(("null-value"))
+ .displayName("Null Value")
+ .description("If true a non existing or empty attribute will be 'null' in the resulting CSV. If false an empty " +
+ "string will be placed in the CSV")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .build();
+
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+ .description("Successfully converted attributes to CSV").build();
+ public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
+ .description("Failed to convert attributes to CSV").build();
+
+ private List<PropertyDescriptor> properties;
+ private Set<Relationship> relationships;
+ private volatile Set<String> userSpecifiedAttributes;
+ private volatile Boolean includeCoreAttributes;
+ private volatile Set<String> coreAttributes;
+ private volatile boolean destinationContent;
+ private volatile boolean nullValForEmptyString;
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(ATTRIBUTES_LIST);
+ properties.add(DESTINATION);
+ properties.add(INCLUDE_CORE_ATTRIBUTES);
+ properties.add(NULL_VALUE_FOR_EMPTY_STRING);
+ this.properties = Collections.unmodifiableList(properties);
+
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+
+ private Map<String, String> buildAttributesMapForFlowFile(FlowFile ff, Set<String> userSpecifiedAttributes) {
+ Map<String, String> result;
+ if (!userSpecifiedAttributes.isEmpty()) {
+ //the user gave a list of attributes
+ result = new HashMap<>(userSpecifiedAttributes.size());
+ for (String attribute : userSpecifiedAttributes) {
+ String val = ff.getAttribute(attribute);
+ if (val != null && !val.isEmpty()) {
+ result.put(attribute, val);
+ } else {
+ if (nullValForEmptyString) {
+ result.put(attribute, "null");
+ } else {
+ result.put(attribute, "");
+ }
+ }
+ }
+ } else {
+ //the user did not give a list of attributes, take all the attributes from the flowfile
+ Map<String, String> ffAttributes = ff.getAttributes();
+ result = new HashMap<>(ffAttributes.size());
+ for (Map.Entry<String, String> e : ffAttributes.entrySet()) {
+ result.put(e.getKey(), e.getValue());
+ }
+ }
+
+ //now glue on the core attributes if the user wants them.
+ if(includeCoreAttributes) {
+ for (String coreAttribute : coreAttributes) {
+ String val = ff.getAttribute(coreAttribute);
+ //make sure this coreAttribute is applicable to this flowfile.
+ if(ff.getAttributes().containsKey(coreAttribute)) {
+ if (val != null && !val.isEmpty()) {
+ result.put(coreAttribute, val);
+ } else {
+ if (nullValForEmptyString) {
+ result.put(coreAttribute, "null");
+ } else {
+ result.put(coreAttribute, "");
+ }
+ }
+ }
+ }
+ } else {
+ //remove core attributes since the user does not want them, unless they are in the attribute list. Attribute List always wins
+ for (String coreAttribute : coreAttributes) {
+ //never override user specified attributes, even if the user has selected to exclude core attributes
+ if(!userSpecifiedAttributes.contains(coreAttribute)) {
+ result.remove(coreAttribute);
+ }
+ }
+ }
+ return result;
+ }
+
+ private Set<String> attributeListStringToSet(String attributeList) {
+ //take the user specified attribute list string and convert to list of strings.
+ Set<String> result = new HashSet<>();
+ if (StringUtils.isNotBlank(attributeList)) {
+ String[] ats = StringUtils.split(attributeList, OUTPUT_SEPARATOR);
+ if (ats != null) {
+ for (String str : ats) {
+ String trim = str.trim();
+ result.add(trim);
+ }
+ }
+ }
+ return result;
+ }
+
+ @OnScheduled
+ public void onScheduled(ProcessContext context) {
+ includeCoreAttributes = context.getProperty(INCLUDE_CORE_ATTRIBUTES).asBoolean();
+ coreAttributes = Arrays.stream(CoreAttributes.values()).map(CoreAttributes::key).collect(Collectors.toSet());
+ userSpecifiedAttributes = attributeListStringToSet(context.getProperty(ATTRIBUTES_LIST).getValue());
+ destinationContent = OUTPUT_OVERWRITE_CONTENT.equals(context.getProperty(DESTINATION).getValue());
+ nullValForEmptyString = context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean();
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ final FlowFile original = session.get();
+ if (original == null) {
+ return;
+ }
+
+ final Map<String, String> atrList = buildAttributesMapForFlowFile(original, userSpecifiedAttributes);
+
+ //escape attribute values
+ final StringBuilder sb = new StringBuilder();
+ for (final String val : atrList.values()) {
+ sb.append(StringEscapeUtils.escapeCsv(val));
+ sb.append(OUTPUT_SEPARATOR);
+ }
+
+ //check if the output separator is at the end of the string, if so then remove it
+ if(sb.length() > 0 && sb.lastIndexOf(OUTPUT_SEPARATOR) == sb.length() -1) {
--- End diff --
I think you could eliminate this by adding `int index = 0;` after the declaration of `sb` and then doing:
`sb.append(index++ < atrList.values().size() -1 ? OUTPUT_SEPARATOR : "")`
> AttributesToCSV
> ---------------
>
> Key: NIFI-1705
> URL: https://issues.apache.org/jira/browse/NIFI-1705
> Project: Apache NiFi
> Issue Type: Sub-task
> Components: Extensions
> Reporter: Randy Gelhausen
> Priority: Major
>
> Create a new processor which converts a Flowfile's attributes into CSV content.
> Should support the same configuration options as the AttributesToJSON processor
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)