You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/06/07 19:04:05 UTC
nifi git commit: Squashed commit of the following:
Repository: nifi
Updated Branches:
refs/heads/master c412445a1 -> f5364875e
Squashed commit of the following:
commit 9c31e45d3f94bf1e73c87379f315f6559e6a23f4
Author: Joe Trite <jo...@gmail.com>
Date: Mon Mar 13 07:55:19 2017 -0400
NIFI-1705 Adding AttributesToCSV processor
commit 5e9afa9ccf4276c7a2318b761ded77fd514a60d9
Merge: 3177eb185 74cbfc4b6
Author: Joe Trite <jo...@gmail.com>
Date: Sat Mar 4 08:12:39 2017 -0500
Merge remote-tracking branch 'origin/master'
# Conflicts:
# nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java
commit 3177eb1852dbc7e70c250c53d038a5bcc5cfa3c5
Author: Joe Trite <jo...@gmail.com>
Date: Sat Mar 4 08:09:06 2017 -0500
NIFI-3497 Converted properties to use displayName, changed validator on demarcator property and created a method for duplicate code.
commit 74cbfc4b69ffde1ddb4442b91e160b66ea3c8b6b
Merge: a974f78e0 f8cad0f8c
Author: Joe Trite <jo...@gmail.com>
Date: Sat Mar 4 07:47:46 2017 -0500
Merge branch 'master' into master
commit a974f78e033885455cadd2cbffd7e387cbabc4d7
Author: Joe Trite <jo...@gmail.com>
Date: Sat Mar 4 07:43:02 2017 -0500
NIFI-3497 Converted properties to use displayName, changed validator on demarcator property and created a method for duplicate code.
commit 1bfaef8e87eaa6af84b1a80cb680e0a15eef2f8d
Merge: 65ed46de9 89ec68d14
Author: Joe Trite <jo...@gmail.com>
Date: Fri Mar 3 08:01:59 2017 -0500
Merge branch 'master' of https://github.com/joetrite/nifi
commit 65ed46de9a00518cfe06ecd69bd4f3bbd8d3e662
Author: Joe Trite <jo...@gmail.com>
Date: Fri Feb 24 18:09:36 2017 -0500
NIFI-3497 - fixing Pcontrib issues
commit c5d52cf6f0f16496d9ef83411770409c0b2f88d4
Author: Joe Trite <jo...@gmail.com>
Date: Thu Feb 23 10:19:01 2017 -0500
NIFI-3497 - excluding test files
Adding new test data files to exclude list.
commit b1959341138d3b5004979544fcea86ba36fe6ebb
Author: Joe Trite <jo...@gmail.com>
Date: Wed Feb 22 16:48:10 2017 -0500
NIFI-3497 - New dictionary files for test
Adding new dictionary files to support metadata dictionary option.
commit e296268f39bf213a9e8eaa8298b26556c6efe278
Author: Joe Trite <jo...@gmail.com>
Date: Wed Feb 22 16:46:13 2017 -0500
NIFI-3497 test cases for metadata updates
Adding test cases to support metadata option update.
commit de7e348e62c0f7c5fbd334106878ca6ac46935af
Author: Joe Trite <jo...@gmail.com>
Date: Wed Feb 22 16:36:08 2017 -0500
NIFI-3497 - Added metadata option
Added optional to post additional metadata as new attributed if a match is found in the dictionary.
commit 89ec68d14bb34cbe65ff9a4d50ff5321fd4ec0ef
Author: Joe Trite <jo...@gmail.com>
Date: Fri Feb 24 18:09:36 2017 -0500
NIFI-3497 - fixing Pcontrib issues
commit d71426037b142da8ca04dae38952c164d1614806
Author: Joe Trite <jo...@gmail.com>
Date: Thu Feb 23 10:19:01 2017 -0500
NIFI-3497 - excluding test files
Adding new test data files to exclude list.
commit a7a7b6ace80380416c342809ce95a4f4087bb066
Author: Joe Trite <jo...@gmail.com>
Date: Wed Feb 22 16:48:10 2017 -0500
NIFI-3497 - New dictionary files for test
Adding new dictionary files to support metadata dictionary option.
commit 8eb54a50193897cf564eb7d222aae35481168af4
Author: Joe Trite <jo...@gmail.com>
Date: Wed Feb 22 16:46:13 2017 -0500
NIFI-3497 test cases for metadata updates
Adding test cases to support metadata option update.
commit f52e1f2a064b31f87d4165af6075716fa7d55046
Author: Joe Trite <jo...@gmail.com>
Date: Wed Feb 22 16:36:08 2017 -0500
NIFI-3497 - Added metadata option
Added optional to post additional metadata as new attributed if a match is found in the dictionary.
NIFI-1705 Adding AttributesToCSV processor.
NIFI-1705 Adding AttributesToCSV processor.
NIFI-1705 Adding AttributesToCSV processor.
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #2711
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f5364875
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f5364875
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f5364875
Branch: refs/heads/master
Commit: f5364875e82aa18aa157c0fedc152cf209110ba6
Parents: c412445
Author: Joe Trite <jo...@gmail.com>
Authored: Wed May 16 14:32:05 2018 -0400
Committer: Matthew Burgess <ma...@apache.org>
Committed: Thu Jun 7 14:48:53 2018 -0400
----------------------------------------------------------------------
.../processors/standard/AttributesToCSV.java | 335 ++++++++
.../org.apache.nifi.processor.Processor | 1 +
.../standard/TestAttributesToCSV.java | 821 +++++++++++++++++++
3 files changed, 1157 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/f5364875/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToCSV.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToCSV.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToCSV.java
new file mode 100644
index 0000000..4859d41
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToCSV.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import org.apache.commons.text.StringEscapeUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.List;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.Collections;
+import java.util.Arrays;
+import java.util.ArrayList;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"csv", "attributes", "flowfile"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Generates a CSV representation of the input FlowFile Attributes. The resulting CSV " +
+ "can be written to either a newly generated attribute named 'CSVAttributes' or written to the FlowFile as content. " +
+ "If the attribute value contains a comma, newline or double quote, then the attribute value will be " +
+ "escaped with double quotes. Any double quote characters in the attribute value are escaped with " +
+ "another double quote.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "CSVSchema", description = "CSV representation of the Schema"),
+ @WritesAttribute(attribute = "CSVData", description = "CSV representation of Attributes")
+})
+
+public class AttributesToCSV extends AbstractProcessor {
+ private static final String DATA_ATTRIBUTE_NAME = "CSVData";
+ private static final String SCHEMA_ATTRIBUTE_NAME = "CSVSchema";
+ private static final String OUTPUT_SEPARATOR = ",";
+ private static final String OUTPUT_MIME_TYPE = "text/csv";
+ private static final String SPLIT_REGEX = OUTPUT_SEPARATOR + "(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)";
+
+ static final AllowableValue OUTPUT_OVERWRITE_CONTENT = new AllowableValue("flowfile-content", "flowfile-content", "The resulting CSV string will be placed into the content of the flowfile." +
+ "Existing flowfile context will be overwritten. 'CSVData' will not be written to at all (neither null nor empty string).");
+ static final AllowableValue OUTPUT_NEW_ATTRIBUTE= new AllowableValue("flowfile-attribute", "flowfile-attribute", "The resulting CSV string will be placed into a new flowfile" +
+ " attribute named 'CSVData'. The content of the flowfile will not be changed.");
+
+ public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder()
+ .name("attribute-list")
+ .displayName("Attribute List")
+ .description("Comma separated list of attributes to be included in the resulting CSV. If this value " +
+ "is left empty then all existing Attributes will be included. This list of attributes is " +
+ "case sensitive and supports attribute names that contain commas. If an attribute specified in the list is not found it will be emitted " +
+ "to the resulting CSV with an empty string or null depending on the 'Null Value' property. " +
+ "If a core attribute is specified in this list " +
+ "and the 'Include Core Attributes' property is false, the core attribute will be included. The attribute list " +
+ "ALWAYS wins.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor ATTRIBUTES_REGEX = new PropertyDescriptor.Builder()
+ .name("attributes-regex")
+ .displayName("Attributes Regular Expression")
+ .description("Regular expression that will be evaluated against the flow file attributes to select "
+ + "the matching attributes. This property can be used in combination with the attributes "
+ + "list property. The final output will contain a combination of matches found in the ATTRIBUTE_LIST and ATTRIBUTE_REGEX.")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true))
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
+ .name("destination")
+ .displayName("Destination")
+ .description("Control if CSV value is written as a new flowfile attribute 'CSVData' " +
+ "or written in the flowfile content.")
+ .required(true)
+ .allowableValues(OUTPUT_NEW_ATTRIBUTE, OUTPUT_OVERWRITE_CONTENT)
+ .defaultValue(OUTPUT_NEW_ATTRIBUTE.getDisplayName())
+ .build();
+
+ public static final PropertyDescriptor INCLUDE_CORE_ATTRIBUTES = new PropertyDescriptor.Builder()
+ .name("include-core-attributes")
+ .displayName("Include Core Attributes")
+ .description("Determines if the FlowFile org.apache.nifi.flowfile.attributes.CoreAttributes, which are " +
+ "contained in every FlowFile, should be included in the final CSV value generated. Core attributes " +
+ "will be added to the end of the CSVData and CSVSchema strings. The Attribute List property " +
+ "overrides this setting.")
+ .required(true)
+ .allowableValues("true", "false")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .defaultValue("true")
+ .build();
+
+ public static final PropertyDescriptor NULL_VALUE_FOR_EMPTY_STRING = new PropertyDescriptor.Builder()
+ .name("null-value")
+ .displayName("Null Value")
+ .description("If true a non existing or empty attribute will be 'null' in the resulting CSV. If false an empty " +
+ "string will be placed in the CSV")
+ .required(true)
+ .allowableValues("true", "false")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .defaultValue("false")
+ .build();
+ public static final PropertyDescriptor INCLUDE_SCHEMA = new PropertyDescriptor.Builder()
+ .name("include-schema")
+ .displayName("Include Schema")
+ .description("If true the schema (attribute names) will also be converted to a CSV string which will either be " +
+ "applied to a new attribute named 'CSVSchema' or applied at the first row in the " +
+ "content depending on the DESTINATION property setting.")
+ .required(true)
+ .allowableValues("true", "false")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .defaultValue("false")
+ .build();
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+ .description("Successfully converted attributes to CSV").build();
+ public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
+ .description("Failed to convert attributes to CSV").build();
+
+ private List<PropertyDescriptor> properties;
+ private Set<Relationship> relationships;
+ private volatile Boolean includeCoreAttributes;
+ private volatile Set<String> coreAttributes;
+ private volatile boolean destinationContent;
+ private volatile boolean nullValForEmptyString;
+ private volatile Pattern pattern;
+ private volatile Boolean includeSchema;
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(ATTRIBUTES_LIST);
+ properties.add(ATTRIBUTES_REGEX);
+ properties.add(DESTINATION);
+ properties.add(INCLUDE_CORE_ATTRIBUTES);
+ properties.add(NULL_VALUE_FOR_EMPTY_STRING);
+ properties.add(INCLUDE_SCHEMA);
+ this.properties = Collections.unmodifiableList(properties);
+
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+
+ private Map<String, String> buildAttributesMapForFlowFile(FlowFile ff, Set<String> attributes, Pattern attPattern) {
+ Map<String, String> result;
+ Map<String, String> ffAttributes = ff.getAttributes();
+ result = new LinkedHashMap<>(ffAttributes.size());
+
+ if (!attributes.isEmpty() || attPattern != null) {
+ if (!attributes.isEmpty()) {
+ //the user gave a list of attributes
+ for (String attribute : attributes) {
+ String val = ff.getAttribute(attribute);
+ if (val != null && !val.isEmpty()) {
+ result.put(attribute, val);
+ } else {
+ if (nullValForEmptyString) {
+ result.put(attribute, "null");
+ } else {
+ result.put(attribute, "");
+ }
+ }
+ }
+ }
+ if(attPattern != null) {
+ for (Map.Entry<String, String> e : ff.getAttributes().entrySet()) {
+ if(attPattern.matcher(e.getKey()).matches()) {
+ result.put(e.getKey(), e.getValue());
+ }
+ }
+ }
+ } else {
+ //the user did not give a list of attributes, take all the attributes from the flowfile
+ result.putAll(ffAttributes);
+ }
+
+ //now glue on the core attributes if the user wants them.
+ if(includeCoreAttributes) {
+ for (String coreAttribute : coreAttributes) {
+ //make sure this coreAttribute is applicable to this flowfile.
+ String val = ff.getAttribute(coreAttribute);
+ if(ffAttributes.containsKey(coreAttribute)) {
+ if (!StringUtils.isEmpty(val)){
+ result.put(coreAttribute, val);
+ } else {
+ if (nullValForEmptyString) {
+ result.put(coreAttribute, "null");
+ } else {
+ result.put(coreAttribute, "");
+ }
+ }
+ }
+ }
+ } else {
+ //remove core attributes since the user does not want them, unless they are in the attribute list. Attribute List always wins
+ for (String coreAttribute : coreAttributes) {
+ //never override user specified attributes, even if the user has selected to exclude core attributes
+ if(!attributes.contains(coreAttribute)) {
+ result.remove(coreAttribute);
+ }
+ }
+ }
+ return result;
+ }
+
+ private LinkedHashSet<String> attributeListStringToSet(String attributeList) {
+ //take the user specified attribute list string and convert to list of strings.
+ LinkedHashSet<String> result = new LinkedHashSet<>();
+ if (StringUtils.isNotBlank(attributeList)) {
+ String[] ats = attributeList.split(SPLIT_REGEX);
+ for (String str : ats) {
+ result.add(StringEscapeUtils.unescapeCsv(str.trim()));
+ }
+ }
+ return result;
+ }
+
+ @OnScheduled
+ public void onScheduled(ProcessContext context) {
+ includeCoreAttributes = context.getProperty(INCLUDE_CORE_ATTRIBUTES).asBoolean();
+ coreAttributes = Arrays.stream(CoreAttributes.values()).map(CoreAttributes::key).collect(Collectors.toSet());
+ destinationContent = OUTPUT_OVERWRITE_CONTENT.getValue().equals(context.getProperty(DESTINATION).getValue());
+ nullValForEmptyString = context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean();
+ includeSchema = context.getProperty(INCLUDE_SCHEMA).asBoolean();
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ final FlowFile original = session.get();
+ if (original == null) {
+ return;
+ }
+ if(context.getProperty(ATTRIBUTES_REGEX).isSet()) {
+ pattern = Pattern.compile(context.getProperty(ATTRIBUTES_REGEX).evaluateAttributeExpressions(original).getValue());
+ }
+
+ final Set<String> attributeList = attributeListStringToSet(context.getProperty(ATTRIBUTES_LIST).evaluateAttributeExpressions(original).getValue());
+ final Map<String, String> atrList = buildAttributesMapForFlowFile(original, attributeList, pattern);
+
+ //escape attribute values
+ int index = 0;
+ final int atrListSize = atrList.values().size() -1;
+ final StringBuilder sbValues = new StringBuilder();
+ for (final Map.Entry<String,String> attr : atrList.entrySet()) {
+ sbValues.append(StringEscapeUtils.escapeCsv(attr.getValue()));
+ sbValues.append(index++ < atrListSize ? OUTPUT_SEPARATOR : "");
+ }
+
+ //build the csv header if needed
+ final StringBuilder sbNames = new StringBuilder();
+ if(includeSchema){
+ index = 0;
+ for (final Map.Entry<String,String> attr : atrList.entrySet()) {
+ sbNames.append(StringEscapeUtils.escapeCsv(attr.getKey()));
+ sbNames.append(index++ < atrListSize ? OUTPUT_SEPARATOR : "");
+ }
+ }
+
+ try {
+ if (destinationContent) {
+ FlowFile conFlowfile = session.write(original, (in, out) -> {
+ if(includeSchema){
+ sbNames.append(System.getProperty("line.separator"));
+ out.write(sbNames.toString().getBytes());
+ }
+ out.write(sbValues.toString().getBytes());
+ });
+ conFlowfile = session.putAttribute(conFlowfile, CoreAttributes.MIME_TYPE.key(), OUTPUT_MIME_TYPE);
+ session.transfer(conFlowfile, REL_SUCCESS);
+ } else {
+ FlowFile atFlowfile = session.putAttribute(original, DATA_ATTRIBUTE_NAME , sbValues.toString());
+ if(includeSchema){
+ session.putAttribute(original, SCHEMA_ATTRIBUTE_NAME , sbNames.toString());
+ }
+ session.transfer(atFlowfile, REL_SUCCESS);
+ }
+ } catch (Exception e) {
+ getLogger().error(e.getMessage());
+ session.transfer(original, REL_FAILURE);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/f5364875/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index a15ccf5..8195963 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.standard.AttributesToJSON
+org.apache.nifi.processors.standard.AttributesToCSV
org.apache.nifi.processors.standard.Base64EncodeContent
org.apache.nifi.processors.standard.CompressContent
org.apache.nifi.processors.standard.ControlRate
http://git-wip-us.apache.org/repos/asf/nifi/blob/f5364875/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToCSV.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToCSV.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToCSV.java
new file mode 100644
index 0000000..457cbc7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToCSV.java
@@ -0,0 +1,821 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import com.google.common.base.Splitter;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestAttributesToCSV {
+
+ private static final String OUTPUT_NEW_ATTRIBUTE = "flowfile-attribute";
+ private static final String OUTPUT_OVERWRITE_CONTENT = "flowfile-content";
+ private static final String OUTPUT_ATTRIBUTE_NAME = "CSVData";
+ private static final String OUTPUT_SEPARATOR = ",";
+ private static final String OUTPUT_MIME_TYPE = "text/csv";
+ private static final String SPLIT_REGEX = OUTPUT_SEPARATOR + "(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)";
+ private static final String newline = System.getProperty("line.separator");
+
+ @Test
+ public void testAttrListNoCoreNullOffNewAttrToAttribute() throws Exception {
+ final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV());
+ testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE);
+ testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false");
+ testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false");
+
+ final String NON_PRESENT_ATTRIBUTE_KEY = "beach-type";
+ testRunner.setProperty(AttributesToCSV.ATTRIBUTES_LIST, NON_PRESENT_ATTRIBUTE_KEY);
+ testRunner.enqueue(new byte[0]);
+ testRunner.run();
+
+ testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS).get(0)
+ .assertAttributeExists("CSVData");
+ testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1);
+ testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0);
+
+ testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS)
+ .get(0).assertAttributeEquals("CSVData","");
+ }
+
+ @Test
+ public void testAttrListNoCoreNullOffNewAttrToContent() throws Exception {
+ final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV());
+ //set the destination of the csv string to be an attribute
+ testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE);
+ testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false");
+ testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false");
+
+ //use only one attribute, which does not exists, as the list of attributes to convert to csv
+ final String NON_PRESENT_ATTRIBUTE_KEY = "beach-type";
+ testRunner.setProperty(AttributesToCSV.ATTRIBUTES_LIST, NON_PRESENT_ATTRIBUTE_KEY);
+ testRunner.enqueue(new byte[0]);
+ testRunner.run();
+
+ testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS).get(0)
+ .assertAttributeExists("CSVData");
+ testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1);
+ testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0);
+
+ testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS)
+ .get(0).assertAttributeEquals("CSVData","");
+ }
+
+ @Test
+ public void testAttrListNoCoreNullOffTwoNewAttrToAttribute() throws Exception {
+ final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV());
+ testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE);
+ testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false");
+ testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false");
+
+ final String NON_PRESENT_ATTRIBUTE_KEY = "beach-type,beach-length";
+ testRunner.setProperty(AttributesToCSV.ATTRIBUTES_LIST, NON_PRESENT_ATTRIBUTE_KEY);
+ testRunner.enqueue(new byte[0]);
+ testRunner.run();
+
+ testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS).get(0)
+ .assertAttributeExists("CSVData");
+ testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1);
+ testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0);
+
+ testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS)
+ .get(0).assertAttributeEquals("CSVData",",");
+ }
+
+ @Test
+ public void testAttrListNoCoreNullTwoNewAttrToAttribute() throws Exception {
+ final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV());
+ testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE);
+ testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false");
+ testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "true");
+
+ final String NON_PRESENT_ATTRIBUTE_KEY = "beach-type,beach-length";
+ testRunner.setProperty(AttributesToCSV.ATTRIBUTES_LIST, NON_PRESENT_ATTRIBUTE_KEY);
+ testRunner.enqueue(new byte[0]);
+ testRunner.run();
+
+ testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS).get(0)
+ .assertAttributeExists("CSVData");
+ testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1);
+ testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0);
+
+ testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS)
+ .get(0).assertAttributeEquals("CSVData","null,null");
+ }
+
+ @Test
+ public void testNoAttrListNoCoreNullOffToAttribute() throws Exception {
+ final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV());
+ //set the destination of the csv string to be an attribute
+ testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE);
+ testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false");
+ testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false");
+ testRunner.enqueue(new byte[0]);
+ testRunner.run();
+
+ testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS).get(0)
+ .assertAttributeExists("CSVData");
+ testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1);
+ testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0);
+
+ testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS)
+ .get(0).assertAttributeEquals("CSVData","");
+ }
+
+ @Test
+ public void testNoAttrListNoCoreNullToAttribute() throws Exception {
+ final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV());
+ testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE);
+ testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false");
+ testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "true");
+ testRunner.enqueue(new byte[0]);
+ testRunner.run();
+
+ testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS).get(0)
+ .assertAttributeExists("CSVData");
+ testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1);
+ testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0);
+
+ testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS)
+ .get(0).assertAttributeEquals("CSVData","");
+ }
+
+
+ @Test
+ public void testNoAttrListCoreNullOffToContent() throws IOException {
+ final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV());
+ testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_OVERWRITE_CONTENT);
+ testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "true");
+ testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false");
+
+ final Map<String, String> attrs = new HashMap<String, String>(){{
+ put("beach-name", "Malibu Beach");
+ put("beach-location", "California, US");
+ put("beach-endorsement", "This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim");
+ }};
+
+ testRunner.enqueue(new byte[0], attrs);
+ testRunner.run();
+
+ List<MockFlowFile> flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS);
+
+ testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0);
+ testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1);
+
+ MockFlowFile flowFile = flowFilesForRelationship.get(0);
+
+ assertEquals(OUTPUT_MIME_TYPE, flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
+
+ final byte[] contentData = testRunner.getContentAsByteArray(flowFile);
+
+ final String contentDataString = new String(contentData, "UTF-8");
+
+ Set<String> contentValues = new HashSet<>(getStrings(contentDataString));
+
+ assertEquals(6, contentValues.size());
+
+ assertTrue(contentValues.contains("Malibu Beach"));
+ assertTrue(contentValues.contains("\"California, US\""));
+ assertTrue(contentValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\""));
+ assertTrue(contentValues.contains(flowFile.getAttribute("filename")));
+ assertTrue(contentValues.contains(flowFile.getAttribute("path")));
+ assertTrue(contentValues.contains(flowFile.getAttribute("uuid")));
+ }
+
+ @Test
+ public void testNoAttrListCoreNullOffToAttribute() throws IOException {
+ final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV());
+ testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "true");
+ testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false");
+
+ Map<String, String> attrs = new HashMap<String, String>(){{
+ put("beach-name", "Malibu Beach");
+ put("beach-location", "California, US");
+ put("beach-endorsement", "This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim");
+ }};
+
+ testRunner.enqueue(new byte[0], attrs);
+ testRunner.run();
+
+ List<MockFlowFile> flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS);
+
+ testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0);
+ testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1);
+
+ MockFlowFile flowFile = flowFilesForRelationship.get(0);
+
+ assertNull(flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
+
+ final String attributeData = flowFile.getAttribute(OUTPUT_ATTRIBUTE_NAME);
+
+ Set<String> csvAttributeValues = new HashSet<>(getStrings(attributeData));
+
+ assertEquals(6, csvAttributeValues.size());
+
+ assertTrue(csvAttributeValues.contains("Malibu Beach"));
+ assertTrue(csvAttributeValues.contains("\"California, US\""));
+ assertTrue(csvAttributeValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\""));
+
+ assertTrue(csvAttributeValues.contains(flowFile.getAttribute("filename")));
+ assertTrue(csvAttributeValues.contains(flowFile.getAttribute("path")));
+ assertTrue(csvAttributeValues.contains(flowFile.getAttribute("uuid")));
+ }
+
+ @Test
+ public void testNoAttrListNoCoreNullOffToContent() throws IOException {
+ final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV());
+ testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_OVERWRITE_CONTENT);
+ testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false");
+ testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false");
+
+ Map<String, String> attrs = new HashMap<String, String>(){{
+ put("beach-name", "Malibu Beach");
+ put("beach-location", "California, US");
+ put("beach-endorsement", "This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim");
+ }};
+
+ testRunner.enqueue(new byte[0], attrs);
+ testRunner.run();
+
+ List<MockFlowFile> flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS);
+
+ testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0);
+ testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1);
+
+ MockFlowFile flowFile = flowFilesForRelationship.get(0);
+
+ assertEquals(OUTPUT_MIME_TYPE, flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
+
+ final byte[] contentData = testRunner.getContentAsByteArray(flowFile);
+
+ final String contentDataString = new String(contentData, "UTF-8");
+ Set<String> contentValues = new HashSet<>(getStrings(contentDataString));
+
+ assertEquals(3, contentValues.size());
+
+ assertTrue(contentValues.contains("Malibu Beach"));
+ assertTrue(contentValues.contains("\"California, US\""));
+ assertTrue(contentValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\""));
+
+ }
+
+
+ @Test
+ public void testAttrListNoCoreNullOffToAttribute() throws IOException {
+ final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV());
+ testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE);
+ testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false");
+ testRunner.setProperty(AttributesToCSV.ATTRIBUTES_LIST, "beach-name,beach-location,beach-endorsement");
+ testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false");
+
+ Map<String, String> attrs = new HashMap<String, String>(){{
+ put("beach-name", "Malibu Beach");
+ put("beach-location", "California, US");
+ put("beach-endorsement", "This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim");
+ put("attribute-should-be-eliminated", "This should not be in CSVAttribute!");
+ }};
+
+ testRunner.enqueue(new byte[0], attrs);
+ testRunner.run();
+
+ List<MockFlowFile> flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS);
+
+ testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0);
+ testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1);
+
+ MockFlowFile flowFile = flowFilesForRelationship.get(0);
+
+ assertNull(flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
+
+ final String attributeData = flowFile.getAttribute(OUTPUT_ATTRIBUTE_NAME);
+
+ Set<String> CSVDataValues = new HashSet<>(getStrings(attributeData));
+
+ assertEquals(3, CSVDataValues.size());
+
+ assertTrue(CSVDataValues.contains("Malibu Beach"));
+ assertTrue(CSVDataValues.contains("\"California, US\""));
+ assertTrue(CSVDataValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\""));
+
+ }
+
+ @Test
+ public void testAttrListCoreNullOffToAttribute() throws IOException {
+ final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV());
+ testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE);
+ testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "true");
+ testRunner.setProperty(AttributesToCSV.ATTRIBUTES_LIST, "beach-name,beach-location,beach-endorsement");
+ testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false");
+
+ Map<String, String> attrs = new HashMap<String, String>(){{
+ put("beach-name", "Malibu Beach");
+ put("beach-location", "California, US");
+ put("beach-endorsement", "This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim");
+ put("attribute-should-be-eliminated", "This should not be in CSVData!");
+ }};
+
+ testRunner.enqueue(new byte[0], attrs);
+ testRunner.run();
+
+ List<MockFlowFile> flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS);
+
+ testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0);
+ testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1);
+
+ MockFlowFile flowFile = flowFilesForRelationship.get(0);
+
+ assertNull(flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
+
+ final String attributeData = flowFile.getAttribute(OUTPUT_ATTRIBUTE_NAME);
+
+ Set<String> CSVDataValues = new HashSet<>(getStrings(attributeData));
+
+ assertEquals(6, CSVDataValues.size());
+
+ assertTrue(CSVDataValues.contains("Malibu Beach"));
+ assertTrue(CSVDataValues.contains("\"California, US\""));
+ assertTrue(CSVDataValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\""));
+
+ assertTrue(CSVDataValues.contains(flowFile.getAttribute("filename")));
+ assertTrue(CSVDataValues.contains(flowFile.getAttribute("path")));
+ assertTrue(CSVDataValues.contains(flowFile.getAttribute("uuid")));
+ }
+
+ @Test
+ public void testAttrListNoCoreNullOffOverrideCoreByAttrListToAttribute() throws IOException {
+ final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV());
+ testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE);
+ testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false");
+ testRunner.setProperty(AttributesToCSV.ATTRIBUTES_LIST, "beach-name,beach-location,beach-endorsement,uuid");
+ testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false");
+
+ Map<String, String> attrs = new HashMap<String, String>(){{
+ put("beach-name", "Malibu Beach");
+ put("beach-location", "California, US");
+ put("beach-endorsement", "This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim");
+ put("attribute-should-be-eliminated", "This should not be in CSVData!");
+ }};
+
+ testRunner.enqueue(new byte[0], attrs);
+ testRunner.run();
+
+ List<MockFlowFile> flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS);
+
+ testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0);
+ testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1);
+
+ MockFlowFile flowFile = flowFilesForRelationship.get(0);
+
+ assertNull(flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
+
+ final String attributeData = flowFile.getAttribute(OUTPUT_ATTRIBUTE_NAME);
+
+ Set<String> CSVDataValues = new HashSet<>(getStrings(attributeData));
+
+ assertEquals(4, CSVDataValues.size());
+
+ assertTrue(CSVDataValues.contains("Malibu Beach"));
+ assertTrue(CSVDataValues.contains("\"California, US\""));
+ assertTrue(CSVDataValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\""));
+
+
+ assertTrue(!CSVDataValues.contains(flowFile.getAttribute("filename")));
+ assertTrue(!CSVDataValues.contains(flowFile.getAttribute("path")));
+ assertTrue(CSVDataValues.contains(flowFile.getAttribute("uuid")));
+ }
+
+ @Test
+ public void testAttrListFromExpCoreNullOffToAttribute() throws IOException {
+ final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV());
+ testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE);
+ testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "true");
+ testRunner.setProperty(AttributesToCSV.ATTRIBUTES_LIST, "${myAttribs}");
+ testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false");
+
+ Map<String, String> attrs = new HashMap<String, String>(){{
+ put("beach-name", "Malibu Beach");
+ put("beach-location", "California, US");
+ put("beach-endorsement", "This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim");
+ put("attribute-should-be-eliminated", "This should not be in CSVData!");
+ put("myAttribs", "beach-name,beach-location,beach-endorsement");
+ }};
+
+ testRunner.enqueue(new byte[0], attrs);
+ testRunner.run();
+
+ List<MockFlowFile> flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS);
+
+ testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0);
+ testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1);
+
+ //Test flow file 0 with ATTRIBUTE_LIST populated from expression language
+ MockFlowFile flowFile = flowFilesForRelationship.get(0);
+
+ assertNull(flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
+
+ String attributeData = flowFile.getAttribute(OUTPUT_ATTRIBUTE_NAME);
+
+ Set<String> CSVDataValues = new HashSet<>(getStrings(attributeData));
+
+ assertEquals(6, CSVDataValues.size());
+
+ assertTrue(CSVDataValues.contains("Malibu Beach"));
+ assertTrue(CSVDataValues.contains("\"California, US\""));
+ assertTrue(CSVDataValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\""));
+
+ assertTrue(CSVDataValues.contains(flowFile.getAttribute("filename")));
+ assertTrue(CSVDataValues.contains(flowFile.getAttribute("path")));
+ assertTrue(CSVDataValues.contains(flowFile.getAttribute("uuid")));
+
+ //Test flow file 1 with ATTRIBUTE_LIST populated from expression language containing commas (output should be he same)
+ flowFile = flowFilesForRelationship.get(0);
+
+ assertNull(flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
+
+ attributeData = flowFile.getAttribute(OUTPUT_ATTRIBUTE_NAME);
+
+ CSVDataValues = new HashSet<>(getStrings(attributeData));
+
+ assertEquals(6, CSVDataValues.size());
+
+ assertTrue(CSVDataValues.contains("Malibu Beach"));
+ assertTrue(CSVDataValues.contains("\"California, US\""));
+ assertTrue(CSVDataValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\""));
+
+ assertTrue(CSVDataValues.contains(flowFile.getAttribute("filename")));
+ assertTrue(CSVDataValues.contains(flowFile.getAttribute("path")));
+ assertTrue(CSVDataValues.contains(flowFile.getAttribute("uuid")));
+
+ }
+
+ @Test
+ public void testAttrListWithCommasInNameFromExpCoreNullOffToAttribute() throws IOException {
+ final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV());
+ testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE);
+ testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "true");
+ testRunner.setProperty(AttributesToCSV.ATTRIBUTES_LIST, "${myAttribs}");
+ testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false");
+
+
+ Map<String, String> attrsCommaInName = new HashMap<String, String>(){{
+ put("beach,name", "Malibu Beach");
+ put("beach,location", "California, US");
+ put("beach,endorsement", "This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim");
+ put("attribute-should-be-eliminated", "This should not be in CSVData!");
+ put("myAttribs", "\"beach,name\",\"beach,location\",\"beach,endorsement\"");
+ }};
+
+ testRunner.enqueue(new byte[0], attrsCommaInName);
+ testRunner.run();
+
+ List<MockFlowFile> flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS);
+
+ testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0);
+ testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1);
+
+ //Test flow file 0 with ATTRIBUTE_LIST populated from expression language
+ MockFlowFile flowFile = flowFilesForRelationship.get(0);
+
+ assertNull(flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
+
+ String attributeData = flowFile.getAttribute(OUTPUT_ATTRIBUTE_NAME);
+
+ Set<String> CSVDataValues = new HashSet<>(getStrings(attributeData));
+
+ assertEquals(6, CSVDataValues.size());
+
+ assertTrue(CSVDataValues.contains("Malibu Beach"));
+ assertTrue(CSVDataValues.contains("\"California, US\""));
+ assertTrue(CSVDataValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\""));
+
+ assertTrue(CSVDataValues.contains(flowFile.getAttribute("filename")));
+ assertTrue(CSVDataValues.contains(flowFile.getAttribute("path")));
+ assertTrue(CSVDataValues.contains(flowFile.getAttribute("uuid")));
+
+ //Test flow file 1 with ATTRIBUTE_LIST populated from expression language containing commas (output should be he same)
+ flowFile = flowFilesForRelationship.get(0);
+
+ assertNull(flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
+
+ attributeData = flowFile.getAttribute(OUTPUT_ATTRIBUTE_NAME);
+
+ CSVDataValues = new HashSet<>(getStrings(attributeData));
+
+ assertEquals(6, CSVDataValues.size());
+
+ assertTrue(CSVDataValues.contains("Malibu Beach"));
+ assertTrue(CSVDataValues.contains("\"California, US\""));
+ assertTrue(CSVDataValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\""));
+
+ assertTrue(CSVDataValues.contains(flowFile.getAttribute("filename")));
+ assertTrue(CSVDataValues.contains(flowFile.getAttribute("path")));
+ assertTrue(CSVDataValues.contains(flowFile.getAttribute("uuid")));
+
+ }
+
+
+ @Test
+ public void testAttrListFromExpNoCoreNullOffOverrideCoreByAttrListToAttribute() throws IOException {
+ final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV());
+ testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE);
+ testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false");
+ testRunner.setProperty(AttributesToCSV.ATTRIBUTES_LIST, "${myAttribs}");
+ testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false");
+
+ Map<String, String> attrs = new HashMap<String, String>(){{
+ put("beach-name", "Malibu Beach");
+ put("beach-location", "California, US");
+ put("beach-endorsement", "This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim");
+ put("attribute-should-be-eliminated", "This should not be in CSVData!");
+ put("myAttribs", "beach-name,beach-location,beach-endorsement");
+ }};
+
+ testRunner.enqueue(new byte[0], attrs);
+ testRunner.run();
+
+ List<MockFlowFile> flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS);
+
+ testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0);
+ testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1);
+
+ MockFlowFile flowFile = flowFilesForRelationship.get(0);
+
+ assertNull(flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
+
+ final String attributeData = flowFile.getAttribute(OUTPUT_ATTRIBUTE_NAME);
+
+ Set<String> CSVDataValues = new HashSet<>(getStrings(attributeData));
+
+ assertEquals(3, CSVDataValues.size());
+
+ assertTrue(CSVDataValues.contains("Malibu Beach"));
+ assertTrue(CSVDataValues.contains("\"California, US\""));
+ assertTrue(CSVDataValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\""));
+
+
+ assertTrue(!CSVDataValues.contains(flowFile.getAttribute("filename")));
+ assertTrue(!CSVDataValues.contains(flowFile.getAttribute("path")));
+ assertTrue(!CSVDataValues.contains(flowFile.getAttribute("uuid")));
+ }
+
+ @Test
+ public void testAttributesRegex() throws IOException {
+ final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV());
+ testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE);
+ testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false");
+ testRunner.setProperty(AttributesToCSV.ATTRIBUTES_REGEX, "${myRegEx}");
+ testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false");
+
+ Map<String, String> attrs = new HashMap<String, String>(){{
+ put("beach-name", "Malibu Beach");
+ put("beach-location", "California, US");
+ put("beach-endorsement", "This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim");
+ put("attribute-should-be-eliminated", "This should not be in CSVData!");
+ put("myRegEx", "beach-.*");
+ }};
+
+ testRunner.enqueue(new byte[0], attrs);
+ testRunner.run();
+
+ List<MockFlowFile> flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS);
+
+ testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0);
+ testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1);
+
+ MockFlowFile flowFile = flowFilesForRelationship.get(0);
+
+ assertNull(flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
+
+ final String attributeData = flowFile.getAttribute(OUTPUT_ATTRIBUTE_NAME);
+
+ Set<String> CSVDataValues = new HashSet<>(getStrings(attributeData));
+
+ assertEquals(3, CSVDataValues.size());
+
+ assertTrue(CSVDataValues.contains("Malibu Beach"));
+ assertTrue(CSVDataValues.contains("\"California, US\""));
+ assertTrue(CSVDataValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\""));
+
+
+ assertTrue(!CSVDataValues.contains(flowFile.getAttribute("filename")));
+ assertTrue(!CSVDataValues.contains(flowFile.getAttribute("path")));
+ assertTrue(!CSVDataValues.contains(flowFile.getAttribute("uuid")));
+ }
+
+ @Test
+ public void testAttributesRegexAndList() throws IOException {
+ final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV());
+ testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE);
+ testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false");
+ testRunner.setProperty(AttributesToCSV.ATTRIBUTES_REGEX, "${myRegEx}");
+ testRunner.setProperty(AttributesToCSV.ATTRIBUTES_LIST, "moreInfo1,moreInfo2");
+ testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false");
+
+ Map<String, String> attrs = new HashMap<String, String>(){{
+ put("beach-name", "Malibu Beach");
+ put("beach-location", "California, US");
+ put("beach-endorsement", "This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim");
+ put("attribute-should-be-eliminated", "This should not be in CSVData!");
+ put("myRegEx", "beach-.*");
+ put("moreInfo1", "A+ Rating");
+ put("moreInfo2", "Avg Temp: 61f");
+ }};
+
+ testRunner.enqueue(new byte[0], attrs);
+ testRunner.run();
+
+ List<MockFlowFile> flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS);
+
+ testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0);
+ testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1);
+
+ MockFlowFile flowFile = flowFilesForRelationship.get(0);
+
+ assertNull(flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
+
+ final String attributeData = flowFile.getAttribute(OUTPUT_ATTRIBUTE_NAME);
+
+ Set<String> CSVDataValues = new HashSet<>(getStrings(attributeData));
+
+ assertEquals(5, CSVDataValues.size());
+
+ assertTrue(CSVDataValues.contains("Malibu Beach"));
+ assertTrue(CSVDataValues.contains("\"California, US\""));
+ assertTrue(CSVDataValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\""));
+ assertTrue(CSVDataValues.contains("A+ Rating"));
+ assertTrue(CSVDataValues.contains("Avg Temp: 61f"));
+
+ assertTrue(!CSVDataValues.contains(flowFile.getAttribute("filename")));
+ assertTrue(!CSVDataValues.contains(flowFile.getAttribute("path")));
+ assertTrue(!CSVDataValues.contains(flowFile.getAttribute("uuid")));
+ }
+
+
+ @Test
+ public void testSchemaToAttribute() throws Exception {
+ final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV());
+ testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE);
+ testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false");
+ testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false");
+ testRunner.setProperty(AttributesToCSV.INCLUDE_SCHEMA, "true");
+ testRunner.setProperty(AttributesToCSV.ATTRIBUTES_REGEX, "beach-.*");
+
+ Map<String, String> attrs = new HashMap<String, String>(){{
+ put("beach-name", "Malibu Beach");
+ put("beach-location", "California, US");
+ put("attribute-should-be-eliminated", "This should not be in CSVData!");
+ }};
+
+ testRunner.enqueue(new byte[0], attrs);
+ testRunner.run();
+
+ testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS).get(0)
+ .assertAttributeExists("CSVData");
+ testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS).get(0)
+ .assertAttributeExists("CSVSchema");
+ testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1);
+ testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0);
+
+ testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS)
+ .get(0).assertAttributeEquals("CSVData","Malibu Beach,\"California, US\"");
+ testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS)
+ .get(0).assertAttributeEquals("CSVSchema","beach-name,beach-location");
+ }
+
+ @Test
+ public void testSchemaToContent() throws Exception {
+ final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV());
+ //set the destination of the csv string to be an attribute
+ testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_OVERWRITE_CONTENT);
+ testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false");
+ testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false");
+ testRunner.setProperty(AttributesToCSV.INCLUDE_SCHEMA, "true");
+ testRunner.setProperty(AttributesToCSV.ATTRIBUTES_REGEX, "beach-.*");
+
+ Map<String, String> attrs = new HashMap<String, String>(){{
+ put("beach-name", "Malibu Beach");
+ put("beach-location", "California, US");
+ put("attribute-should-be-eliminated", "This should not be in CSVData!");
+ }};
+
+ testRunner.enqueue(new byte[0], attrs);
+ testRunner.run();
+
+ testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1);
+ testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0);
+
+ MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS).get(0);
+ flowFile.assertAttributeNotExists("CSVData");
+ flowFile.assertAttributeNotExists("CSVSchema");
+
+ final byte[] contentData = testRunner.getContentAsByteArray(flowFile);
+
+ final String contentDataString = new String(contentData, "UTF-8");
+ assertEquals(contentDataString.split(newline)[0], "beach-name,beach-location");
+ assertEquals(contentDataString.split(newline)[1], "Malibu Beach,\"California, US\"");
+ }
+
+
+ @Test
+ public void testSchemaWithCoreAttribuesToAttribute() throws Exception {
+ final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV());
+ testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE);
+ testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "true");
+ testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false");
+ testRunner.setProperty(AttributesToCSV.INCLUDE_SCHEMA, "true");
+ testRunner.setProperty(AttributesToCSV.ATTRIBUTES_REGEX, "beach-.*");
+
+ Map<String, String> attrs = new HashMap<String, String>(){{
+ put("beach-name", "Malibu Beach");
+ put("beach-location", "California, US");
+ put("attribute-should-be-eliminated", "This should not be in CSVData!");
+ }};
+
+ testRunner.enqueue(new byte[0], attrs);
+ testRunner.run();
+
+ testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1);
+ testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0);
+
+ MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS).get(0);
+ flowFile.assertAttributeExists("CSVData");
+ flowFile.assertAttributeExists("CSVSchema");
+
+ final String path = flowFile.getAttribute("path");
+ final String filename = flowFile.getAttribute("filename");
+ final String uuid = flowFile.getAttribute("uuid");
+
+ flowFile.assertAttributeEquals("CSVData", "Malibu Beach,\"California, US\"," + path + "," + filename + "," + uuid);
+ flowFile.assertAttributeEquals("CSVSchema","beach-name,beach-location,path,filename,uuid");
+ }
+
+ @Test
+ public void testSchemaWithCoreAttribuesToContent() throws Exception {
+ final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV());
+ //set the destination of the csv string to be an attribute
+ testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_OVERWRITE_CONTENT);
+ testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "true");
+ testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false");
+ testRunner.setProperty(AttributesToCSV.INCLUDE_SCHEMA, "true");
+ testRunner.setProperty(AttributesToCSV.ATTRIBUTES_REGEX, "beach-.*");
+
+ Map<String, String> attrs = new HashMap<String, String>(){{
+ put("beach-name", "Malibu Beach");
+ put("beach-location", "California, US");
+ put("attribute-should-be-eliminated", "This should not be in CSVData!");
+ }};
+
+ testRunner.enqueue(new byte[0], attrs);
+ testRunner.run();
+
+ testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1);
+ testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0);
+
+ MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS).get(0);
+ flowFile.assertAttributeNotExists("CSVData");
+ flowFile.assertAttributeNotExists("CSVSchema");
+
+ final String path = flowFile.getAttribute("path");
+ final String filename = flowFile.getAttribute("filename");
+ final String uuid = flowFile.getAttribute("uuid");
+
+ final byte[] contentData = testRunner.getContentAsByteArray(flowFile);
+
+ final String contentDataString = new String(contentData, "UTF-8");
+ assertEquals(contentDataString.split(newline)[0], "beach-name,beach-location,path,filename,uuid");
+ assertEquals(contentDataString.split(newline)[1], "Malibu Beach,\"California, US\"," + path + "," + filename + "," + uuid);
+ }
+ private List<String> getStrings(String sdata) {
+ return Splitter.on(Pattern.compile(SPLIT_REGEX)).splitToList(sdata);
+ }
+
+}