You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/09/16 15:42:24 UTC
nifi git commit: NIFI-1942 Processor to validate CSV against
user-supplied schema
Repository: nifi
Updated Branches:
refs/heads/master 022f5a506 -> d838f6129
NIFI-1942 Processor to validate CSV against user-supplied schema
This closes #476
Signed-off-by: jpercivall <jo...@yahoo.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d838f612
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d838f612
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d838f612
Branch: refs/heads/master
Commit: d838f61291d2582592754a37314911b701c6891b
Parents: 022f5a5
Author: Pierre Villard <pi...@gmail.com>
Authored: Fri May 27 10:05:16 2016 +0200
Committer: jpercivall <jo...@yahoo.com>
Committed: Fri Sep 16 11:13:07 2016 -0400
----------------------------------------------------------------------
.../nifi-standard-processors/pom.xml | 5 +
.../nifi/processors/standard/ValidateCsv.java | 618 +++++++++++++++++++
.../org.apache.nifi.processor.Processor | 1 +
.../additionalDetails.html | 109 ++++
.../processors/standard/TestValidateCsv.java | 255 ++++++++
5 files changed, 988 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/d838f612/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 0c27b62..6fbc4d9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -244,6 +244,11 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-standard-utils</artifactId>
<version>1.1.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>net.sf.supercsv</groupId>
+ <artifactId>super-csv</artifactId>
+ <version>2.4.0</version>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/nifi/blob/d838f612/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
new file mode 100644
index 0000000..4788080
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.supercsv.cellprocessor.Optional;
+import org.supercsv.cellprocessor.ParseBigDecimal;
+import org.supercsv.cellprocessor.ParseBool;
+import org.supercsv.cellprocessor.ParseChar;
+import org.supercsv.cellprocessor.ParseDate;
+import org.supercsv.cellprocessor.ParseDouble;
+import org.supercsv.cellprocessor.ParseInt;
+import org.supercsv.cellprocessor.ParseLong;
+import org.supercsv.cellprocessor.constraint.DMinMax;
+import org.supercsv.cellprocessor.constraint.Equals;
+import org.supercsv.cellprocessor.constraint.ForbidSubStr;
+import org.supercsv.cellprocessor.constraint.IsIncludedIn;
+import org.supercsv.cellprocessor.constraint.LMinMax;
+import org.supercsv.cellprocessor.constraint.NotNull;
+import org.supercsv.cellprocessor.constraint.RequireHashCode;
+import org.supercsv.cellprocessor.constraint.RequireSubStr;
+import org.supercsv.cellprocessor.constraint.StrMinMax;
+import org.supercsv.cellprocessor.constraint.StrNotNullOrEmpty;
+import org.supercsv.cellprocessor.constraint.StrRegEx;
+import org.supercsv.cellprocessor.constraint.Strlen;
+import org.supercsv.cellprocessor.constraint.Unique;
+import org.supercsv.cellprocessor.constraint.UniqueHashCode;
+import org.supercsv.cellprocessor.ift.CellProcessor;
+import org.supercsv.exception.SuperCsvException;
+import org.supercsv.io.CsvListReader;
+import org.supercsv.prefs.CsvPreference;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"csv", "schema", "validation"})
+@CapabilityDescription("Validates the contents of FlowFiles against a user-specified CSV schema. " +
+ "Take a look at the additional documentation of this processor for some schema examples.")
+public class ValidateCsv extends AbstractProcessor {
+
+ private final static List<String> allowedOperators = Arrays.asList("ParseBigDecimal", "ParseBool", "ParseChar", "ParseDate",
+ "ParseDouble", "ParseInt", "ParseLong", "Optional", "DMinMax", "Equals", "ForbidSubStr", "LMinMax", "NotNull", "Null",
+ "RequireHashCode", "RequireSubStr", "Strlen", "StrMinMax", "StrNotNullOrEmpty", "StrRegEx", "Unique",
+ "UniqueHashCode", "IsIncludedIn");
+
+ private static final String routeWholeFlowFile = "FlowFile validation";
+ private static final String routeLinesIndividually = "Line by line validation";
+
+ public static final AllowableValue VALIDATE_WHOLE_FLOWFILE = new AllowableValue(routeWholeFlowFile, routeWholeFlowFile,
+ "As soon as an error is found in the CSV file, the validation will stop and the whole flow file will be routed to the 'invalid'"
+ + " relationship. This option offers best performances.");
+
+ public static final AllowableValue VALIDATE_LINES_INDIVIDUALLY = new AllowableValue(routeLinesIndividually, routeLinesIndividually,
+ "In case an error is found, the input CSV file will be split into two FlowFiles: one routed to the 'valid' "
+ + "relationship containing all the correct lines and one routed to the 'invalid' relationship containing all "
+ + "the incorrect lines. Take care if choosing this option while using Unique cell processors in schema definition:"
+ + "the first occurrence will be considered valid and the next ones as invalid.");
+
+ public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
+ .name("validate-csv-schema")
+ .displayName("Schema")
+ .description("The schema to be used for validation. Is expected a comma-delimited string representing the cell "
+ + "processors to apply. The following cell processors are allowed in the schema definition: "
+ + allowedOperators.toString() + ". Note: cell processors cannot be nested except with Optional.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor HEADER = new PropertyDescriptor.Builder()
+ .name("validate-csv-header")
+ .displayName("Header")
+ .description("True if the incoming flow file contains a header to ignore, false otherwise.")
+ .required(true)
+ .defaultValue("true")
+ .allowableValues("true", "false")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor QUOTE_CHARACTER = new PropertyDescriptor.Builder()
+ .name("validate-csv-quote")
+ .displayName("Quote character")
+ .description("Character used as 'quote' in the incoming data. Example: \"")
+ .required(true)
+ .defaultValue("\"")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor DELIMITER_CHARACTER = new PropertyDescriptor.Builder()
+ .name("validate-csv-delimiter")
+ .displayName("Delimiter character")
+ .description("Character used as 'delimiter' in the incoming data. Example: ,")
+ .required(true)
+ .defaultValue(",")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor END_OF_LINE_CHARACTER = new PropertyDescriptor.Builder()
+ .name("validate-csv-eol")
+ .displayName("End of line symbols")
+ .description("Symbols used as 'end of line' in the incoming data. Example: \\n")
+ .required(true)
+ .defaultValue("\\n")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor VALIDATION_STRATEGY = new PropertyDescriptor.Builder()
+ .name("validate-csv-strategy")
+ .displayName("Validation strategy")
+ .description("Strategy to apply when routing input files to output relationships.")
+ .required(true)
+ .defaultValue(VALIDATE_WHOLE_FLOWFILE.getValue())
+ .allowableValues(VALIDATE_LINES_INDIVIDUALLY, VALIDATE_WHOLE_FLOWFILE)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final Relationship REL_VALID = new Relationship.Builder()
+ .name("valid")
+ .description("FlowFiles that are successfully validated against the schema are routed to this relationship")
+ .build();
+ public static final Relationship REL_INVALID = new Relationship.Builder()
+ .name("invalid")
+ .description("FlowFiles that are not valid according to the specified schema are routed to this relationship")
+ .build();
+
+ private List<PropertyDescriptor> properties;
+ private Set<Relationship> relationships;
+ private final AtomicReference<CellProcessor[]> processors = new AtomicReference<CellProcessor[]>();
+ private final AtomicReference<CsvPreference> preference = new AtomicReference<CsvPreference>();
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(SCHEMA);
+ properties.add(HEADER);
+ properties.add(DELIMITER_CHARACTER);
+ properties.add(QUOTE_CHARACTER);
+ properties.add(END_OF_LINE_CHARACTER);
+ properties.add(VALIDATION_STRATEGY);
+ this.properties = Collections.unmodifiableList(properties);
+
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_VALID);
+ relationships.add(REL_INVALID);
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+ String schema = validationContext.getProperty(SCHEMA).getValue();
+ try {
+ this.parseSchema(validationContext.getProperty(SCHEMA).getValue());
+ } catch (Exception e) {
+ final List<ValidationResult> problems = new ArrayList<>(1);
+ problems.add(new ValidationResult.Builder().subject(SCHEMA.getName())
+ .input(schema)
+ .valid(false)
+ .explanation("Error while parsing the schema: " + e.getMessage())
+ .build());
+ return problems;
+ }
+ return super.customValidate(validationContext);
+ }
+
+ @OnScheduled
+ public void setPreference(final ProcessContext context) {
+ // When going from the UI to Java, the characters are escaped so that what you
+ // input is transferred over to Java as is. So when you type the characters "\"
+ // and "n" into the UI the Java string will end up being those two characters
+ // not the interpreted value "\n".
+ final String msgDemarcator = context.getProperty(END_OF_LINE_CHARACTER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
+ this.preference.set(new CsvPreference.Builder(context.getProperty(QUOTE_CHARACTER).getValue().charAt(0),
+ context.getProperty(DELIMITER_CHARACTER).getValue().charAt(0), msgDemarcator).build());
+ }
+
+ /**
+ * Method used to parse the string supplied by the user. The string is converted
+ * to a list of cell processors used to validate the CSV data.
+ * @param schema Schema to parse
+ */
+ private void parseSchema(String schema) {
+ List<CellProcessor> processorsList = new ArrayList<CellProcessor>();
+
+ String remaining = schema;
+ while(remaining.length() > 0) {
+ remaining = setProcessor(remaining, processorsList);
+ }
+
+ this.processors.set(processorsList.toArray(new CellProcessor[processorsList.size()]));
+ }
+
+ private String setProcessor(String remaining, List<CellProcessor> processorsList) {
+ StringBuffer buffer = new StringBuffer();
+ int i = 0;
+ int opening = 0;
+ int closing = 0;
+ while(buffer.length() != remaining.length()) {
+ char c = remaining.charAt(i);
+ i++;
+
+ if(opening == 0 && c == ',') {
+ if(i == 1) {
+ continue;
+ }
+ break;
+ }
+
+ buffer.append(c);
+
+ if(c == '(') {
+ opening++;
+ } else if(c == ')') {
+ closing++;
+ }
+
+ if(opening > 0 && opening == closing) {
+ break;
+ }
+ }
+
+ final String procString = buffer.toString().trim();
+ opening = procString.indexOf('(');
+ String method = procString;
+ String argument = null;
+ if(opening != -1) {
+ argument = method.substring(opening + 1, method.length() - 1);
+ method = method.substring(0, opening);
+ }
+
+ processorsList.add(getProcessor(method.toLowerCase(), argument));
+
+ return remaining.substring(i);
+ }
+
+ private CellProcessor getProcessor(String method, String argument) {
+ switch (method) {
+
+ case "optional":
+ int opening = argument.indexOf('(');
+ String subMethod = argument;
+ String subArgument = null;
+ if(opening != -1) {
+ subArgument = subMethod.substring(opening + 1, subMethod.length() - 1);
+ subMethod = subMethod.substring(0, opening);
+ }
+ return new Optional(getProcessor(subMethod.toLowerCase(), subArgument));
+
+ case "parsedate":
+ return new ParseDate(argument.substring(1, argument.length() - 1));
+
+ case "parsedouble":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("ParseDouble does not expect any argument but has " + argument);
+ return new ParseDouble();
+
+ case "parsebigdecimal":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("ParseBigDecimal does not expect any argument but has " + argument);
+ return new ParseBigDecimal();
+
+ case "parsebool":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("ParseBool does not expect any argument but has " + argument);
+ return new ParseBool();
+
+ case "parsechar":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("ParseChar does not expect any argument but has " + argument);
+ return new ParseChar();
+
+ case "parseint":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("ParseInt does not expect any argument but has " + argument);
+ return new ParseInt();
+
+ case "parselong":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("ParseLong does not expect any argument but has " + argument);
+ return new ParseLong();
+
+ case "notnull":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("NotNull does not expect any argument but has " + argument);
+ return new NotNull();
+
+ case "strregex":
+ return new StrRegEx(argument.substring(1, argument.length() - 1));
+
+ case "unique":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("Unique does not expect any argument but has " + argument);
+ return new Unique();
+
+ case "uniquehashcode":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("UniqueHashCode does not expect any argument but has " + argument);
+ return new UniqueHashCode();
+
+ case "strlen":
+ String[] splts = argument.split(",");
+ int[] requiredLengths = new int[splts.length];
+ for(int i = 0; i < splts.length; i++) {
+ requiredLengths[i] = Integer.parseInt(splts[i]);
+ }
+ return new Strlen(requiredLengths);
+
+ case "strminmax":
+ String[] splits = argument.split(",");
+ return new StrMinMax(Long.parseLong(splits[0]), Long.parseLong(splits[1]));
+
+ case "lminmax":
+ String[] args = argument.split(",");
+ return new LMinMax(Long.parseLong(args[0]), Long.parseLong(args[1]));
+
+ case "dminmax":
+ String[] doubles = argument.split(",");
+ return new DMinMax(Double.parseDouble(doubles[0]), Double.parseDouble(doubles[1]));
+
+ case "equals":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("Equals does not expect any argument but has " + argument);
+ return new Equals();
+
+ case "forbidsubstr":
+ String[] forbiddenSubStrings = argument.replaceAll("\"", "").split(",[ ]*");
+ return new ForbidSubStr(forbiddenSubStrings);
+
+ case "requiresubstr":
+ String[] requiredSubStrings = argument.replaceAll("\"", "").split(",[ ]*");
+ return new RequireSubStr(requiredSubStrings);
+
+ case "strnotnullorempty":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("StrNotNullOrEmpty does not expect any argument but has " + argument);
+ return new StrNotNullOrEmpty();
+
+ case "requirehashcode":
+ String[] hashs = argument.split(",");
+ int[] hashcodes = new int[hashs.length];
+ for(int i = 0; i < hashs.length; i++) {
+ hashcodes[i] = Integer.parseInt(hashs[i]);
+ }
+ return new RequireHashCode(hashcodes);
+
+ case "null":
+ if(argument != null && !argument.isEmpty())
+ throw new IllegalArgumentException("Null does not expect any argument but has " + argument);
+ return null;
+
+ case "isincludedin":
+ String[] elements = argument.replaceAll("\"", "").split(",[ ]*");
+ return new IsIncludedIn(elements);
+
+ default:
+ throw new IllegalArgumentException("[" + method + "] is not an allowed method to define a Cell Processor");
+ }
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final CsvPreference csvPref = this.preference.get();
+ final boolean header = context.getProperty(HEADER).asBoolean();
+ final ComponentLog logger = getLogger();
+ final CellProcessor[] cellProcs = this.processors.get();
+ final boolean isWholeFFValidation = context.getProperty(VALIDATION_STRATEGY).getValue().equals(VALIDATE_WHOLE_FLOWFILE.getValue());
+
+ final AtomicReference<Boolean> valid = new AtomicReference<Boolean>(true);
+ final AtomicReference<Boolean> isFirstLineValid = new AtomicReference<Boolean>(true);
+ final AtomicReference<Boolean> isFirstLineInvalid = new AtomicReference<Boolean>(true);
+ final AtomicReference<Integer> okCount = new AtomicReference<Integer>(0);
+ final AtomicReference<Integer> totalCount = new AtomicReference<Integer>(0);
+ final AtomicReference<FlowFile> invalidFF = new AtomicReference<FlowFile>(null);
+ final AtomicReference<FlowFile> validFF = new AtomicReference<FlowFile>(null);
+
+ if(!isWholeFFValidation) {
+ invalidFF.set(session.create(flowFile));
+ validFF.set(session.create(flowFile));
+ }
+
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream in) throws IOException {
+ NifiCsvListReader listReader = null;
+ try {
+ listReader = new NifiCsvListReader(new InputStreamReader(in), csvPref);
+
+ // handling of header
+ if(header) {
+ List<String> headerList = listReader.read();
+ if(!isWholeFFValidation) {
+ invalidFF.set(session.append(invalidFF.get(), new OutputStreamCallback() {
+ @Override
+ public void process(OutputStream out) throws IOException {
+ out.write(print(headerList, csvPref, isFirstLineInvalid.get()));
+ }
+ }));
+ validFF.set(session.append(validFF.get(), new OutputStreamCallback() {
+ @Override
+ public void process(OutputStream out) throws IOException {
+ out.write(print(headerList, csvPref, isFirstLineValid.get()));
+ }
+ }));
+ isFirstLineValid.set(false);
+ isFirstLineInvalid.set(false);
+ }
+ }
+
+ boolean stop = false;
+
+ while (!stop) {
+ try {
+
+ final List<Object> list = listReader.read(cellProcs);
+ stop = list == null;
+
+ if(!isWholeFFValidation && !stop) {
+ validFF.set(session.append(validFF.get(), new OutputStreamCallback() {
+ @Override
+ public void process(OutputStream out) throws IOException {
+ out.write(print(list, csvPref, isFirstLineValid.get()));
+ }
+ }));
+ okCount.set(okCount.get() + 1);
+
+ if(isFirstLineValid.get()) {
+ isFirstLineValid.set(false);
+ }
+ }
+
+ } catch (final SuperCsvException e) {
+ valid.set(false);
+ if(isWholeFFValidation) {
+ logger.debug("Failed to validate {} against schema due to {}; routing to 'invalid'", new Object[]{flowFile}, e);
+ break;
+ } else {
+ // we append the invalid line to the flow file that will be routed to invalid relationship
+ invalidFF.set(session.append(invalidFF.get(), new OutputStreamCallback() {
+ @Override
+ public void process(OutputStream out) throws IOException {
+ out.write(print(e.getCsvContext().getRowSource(), csvPref, isFirstLineInvalid.get()));
+ }
+ }));
+
+ if(isFirstLineInvalid.get()) {
+ isFirstLineInvalid.set(false);
+ }
+ }
+ } finally {
+ if(!isWholeFFValidation) {
+ totalCount.set(totalCount.get() + 1);
+ }
+ }
+ }
+
+ } catch (final IOException e) {
+ valid.set(false);
+ logger.error("Failed to validate {} against schema due to {}", new Object[]{flowFile}, e);
+ } finally {
+ if(listReader != null) {
+ listReader.close();
+ }
+ }
+ }
+ });
+
+ if(isWholeFFValidation) {
+ if (valid.get()) {
+ logger.debug("Successfully validated {} against schema; routing to 'valid'", new Object[]{flowFile});
+ session.getProvenanceReporter().route(flowFile, REL_VALID);
+ session.transfer(flowFile, REL_VALID);
+ } else {
+ session.getProvenanceReporter().route(flowFile, REL_INVALID);
+ session.transfer(flowFile, REL_INVALID);
+ }
+ } else {
+ if (valid.get()) {
+ logger.debug("Successfully validated {} against schema; routing to 'valid'", new Object[]{validFF.get()});
+ session.getProvenanceReporter().route(validFF.get(), REL_VALID, "All " + totalCount.get() + " line(s) are valid");
+ session.transfer(validFF.get(), REL_VALID);
+ session.remove(invalidFF.get());
+ session.remove(flowFile);
+ } else if (okCount.get() != 0) {
+ // because of the finally within the 'while' loop
+ totalCount.set(totalCount.get() - 1);
+
+ logger.debug("Successfully validated {}/{} line(s) in {} against schema; routing valid lines to 'valid' and invalid lines to 'invalid'",
+ new Object[]{okCount.get(), totalCount.get(), flowFile});
+ session.getProvenanceReporter().route(validFF.get(), REL_VALID, okCount.get() + " valid line(s)");
+ session.transfer(validFF.get(), REL_VALID);
+ session.getProvenanceReporter().route(invalidFF.get(), REL_INVALID, (totalCount.get() - okCount.get()) + " invalid line(s)");
+ session.transfer(invalidFF.get(), REL_INVALID);
+ session.remove(flowFile);
+ } else {
+ logger.debug("All lines in {} are invalid; routing to 'invalid'", new Object[]{invalidFF.get()});
+ session.getProvenanceReporter().route(invalidFF.get(), REL_INVALID, "All " + totalCount.get() + " line(s) are invalid");
+ session.transfer(invalidFF.get(), REL_INVALID);
+ session.remove(validFF.get());
+ session.remove(flowFile);
+ }
+ }
+ }
+
+ /**
+ * Method used to correctly write the lines by taking into account end of line
+ * character and separator character.
+ * @param list list of elements of the current row
+ * @param csvPref CSV preferences
+ * @param isFirstLine true if this is the first line we append
+ * @return String to append in the flow file
+ */
+ private byte[] print(List<?> list, CsvPreference csvPref, boolean isFirstLine) {
+ StringBuffer buffer = new StringBuffer();
+
+ if (!isFirstLine) {
+ buffer.append(csvPref.getEndOfLineSymbols());
+ }
+
+ final int size = list.size();
+ for(int i = 0; i < size; i++) {
+ buffer.append(list.get(i).toString());
+ if(i != size - 1) {
+ buffer.append((char) csvPref.getDelimiterChar());
+ }
+ }
+
+ return buffer.toString().getBytes();
+ }
+
+ /**
+ * This is required to avoid the side effect of Parse* cell processors. If not overriding
+ * this method, parsing will return objects and writing objects could result in a different
+ * output in comparison to the input.
+ */
+ private class NifiCsvListReader extends CsvListReader {
+
+ public NifiCsvListReader(Reader reader, CsvPreference preferences) {
+ super(reader, preferences);
+ }
+
+ @Override
+ public List<Object> read(CellProcessor... processors) throws IOException {
+ if( processors == null ) {
+ throw new NullPointerException("Processors should not be null");
+ }
+ if( readRow() ) {
+ super.executeProcessors(new ArrayList<Object>(getColumns().size()), processors);
+ return new ArrayList<Object>(getColumns());
+ }
+ return null; // EOF
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d838f612/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index c1f9a77..716e736 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -87,5 +87,6 @@ org.apache.nifi.processors.standard.TailFile
org.apache.nifi.processors.standard.TransformXml
org.apache.nifi.processors.standard.UnpackContent
org.apache.nifi.processors.standard.ValidateXml
+org.apache.nifi.processors.standard.ValidateCsv
org.apache.nifi.processors.standard.ExecuteSQL
org.apache.nifi.processors.standard.FetchDistributedMapCache
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/d838f612/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ValidateCsv/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ValidateCsv/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ValidateCsv/additionalDetails.html
new file mode 100644
index 0000000..c708838
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ValidateCsv/additionalDetails.html
@@ -0,0 +1,109 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<head>
+ <meta charset="utf-8"/>
+ <title>ValidateCsv</title>
+ <link rel="stylesheet" href="../../css/component-usage.css" type="text/css"/>
+</head>
+
+<body>
+<!-- Processor Documentation ================================================== -->
+<h2>Usage Information</h2>
+
+<p>
+ The Validate CSV processor is based on the super-csv library and the concept of
+ <a href="http://super-csv.github.io/super-csv/cell_processors.html" target="_blank">Cell Processors</a>.
+ The corresponding java documentation can be found
+ <a href="http://super-csv.github.io/super-csv/apidocs/org/supercsv/cellprocessor/ift/CellProcessor.html" target="_blank">here</a>.
+</p>
+
+<p>
+ The cell processors cannot be nested (except with Optional which gives the possibility to define a CellProcessor for values
+ that could be null) and must be defined in a comma-delimited string as the Schema property.
+</p>
+
+<p>
+ The supported cell processors are:
+ <ul>
+ <li>ParseBigDecimal</li>
+ <li>ParseBool</li>
+ <li>ParseChar</li>
+ <li>ParseDate</li>
+ <li>ParseDouble</li>
+ <li>ParseInt</li>
+ <li>Optional</li>
+ <li>DMinMax</li>
+ <li>Equals</li>
+ <li>ForbidSubStr</li>
+ <li>LMinMax</li>
+ <li>NotNull</li>
+ <li>Null</li>
+ <li>RequireHashCode</li>
+ <li>RequireSubStr</li>
+ <li>Strlen</li>
+ <li>StrMinMax</li>
+ <li>StrNotNullOrEmpty</li>
+ <li>StrRegEx</li>
+ <li>Unique</li>
+ <li>UniqueHashCode</li>
+ <li>IsIncludedIn</li>
+ </ul>
+</p>
+
+<p>
+ Here are some examples:
+
+ <ul>
+ <b>Schema property:</b> Null, ParseDate("dd/MM/yyyy"), Optional(ParseDouble())<br />
+ <b>Meaning:</b> the input CSV has three columns, the first one can be null and has no specification, the second one must be a date
+ formatted as expected, and the third one must a double or null (no value).
+ </ul>
+
+ <ul>
+ <b>Schema property:</b> ParseBigDecimal(), ParseBool(), ParseChar(), ParseInt(), ParseLong()<br />
+ <b>Meaning:</b> the input CSV has five columns, the first one must be a big decimal, the second one must be a boolean,
+ the third one must be a char, the fourth one must be an integer and the fifth one must be a long.
+ </ul>
+
+ <ul>
+ <b>Schema property:</b> Equals(), NotNull(), StrNotNullOrEmpty()<br />
+ <b>Meaning:</b> the input CSV has three columns, all the values of the first column must be equal to each other, all the values
+ of the second column must be not null, and all the values of the third column are not null/empty string values.
+ </ul>
+
+ <ul>
+ <b>Schema property:</b> Strlen(4), StrMinMax(3,5), StrRegex("[a-z0-9\\._]+@[a-z0-9\\.]+")<br />
+ <b>Meaning:</b> the input CSV has three columns, all the values of the first column must be 4-characters long, all the values
+ of the second column must be between 3 and 5 characters (inclusive), and all the values of the last column must match
+ the provided regular expression (email address).
+ </ul>
+
+ <ul>
+ <b>Schema property:</b> Unique(), UniqueHashCode()<br />
+ <b>Meaning:</b> the input CSV has two columns. All the values of the first column must be unique (all the values are stored in
+ memory and this can be consuming depending of the input). All the values of the second column must be unique (only hash
+ codes of the input values are stored to ensure uniqueness).
+ </ul>
+
+ <ul>
+ <b>Schema property:</b> ForbidSubStr("test", "tset"), RequireSubStr("test")<br />
+ <b>Meaning:</b> the input CSV has two columns. None of the values in the first column must contain one of the provided strings.
+ And all the values of the second column must contain the provided string.
+ </ul>
+</p>
+</body>
+</html>
http://git-wip-us.apache.org/repos/asf/nifi/blob/d838f612/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
new file mode 100644
index 0000000..d3c6493
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+public class TestValidateCsv {
+
+ @Test
+ public void testHeaderAndSplit() {
+ final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+ runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+ runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\n");
+ runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+ runner.setProperty(ValidateCsv.HEADER, "true");
+ runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, ValidateCsv.VALIDATE_LINES_INDIVIDUALLY);
+
+ runner.setProperty(ValidateCsv.SCHEMA, "Null, ParseDate(\"dd/MM/yyyy\"), Optional(ParseDouble())");
+
+ runner.enqueue("Name,Birthdate,Weight\nJohn,22/11/1954,63.2\nBob,01/03/2004,45.0");
+ runner.run();
+
+ runner.assertTransferCount(ValidateCsv.REL_VALID, 1);
+ runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals("Name,Birthdate,Weight\nJohn,22/11/1954,63.2\nBob,01/03/2004,45.0");
+ runner.assertTransferCount(ValidateCsv.REL_INVALID, 0);
+
+ runner.clearTransferState();
+
+ runner.enqueue("Name,Birthdate,Weight\nJohn,22/11/1954,63a2\nBob,01/032004,45.0");
+ runner.run();
+
+ runner.assertTransferCount(ValidateCsv.REL_VALID, 0);
+ runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+ runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertContentEquals("Name,Birthdate,Weight\nJohn,22/11/1954,63a2\nBob,01/032004,45.0");
+
+ runner.clearTransferState();
+
+ runner.enqueue("Name,Birthdate,Weight\nJohn,22/111954,63.2\nBob,01/03/2004,45.0");
+ runner.run();
+
+ runner.assertTransferCount(ValidateCsv.REL_VALID, 1);
+ runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals("Name,Birthdate,Weight\nBob,01/03/2004,45.0");
+ runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+ runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertContentEquals("Name,Birthdate,Weight\nJohn,22/111954,63.2");
+ }
+
+ @Test
+ public void testUniqueWithSplit() {
+ final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+ runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+ runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+ runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+ runner.setProperty(ValidateCsv.HEADER, "false");
+ runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, ValidateCsv.VALIDATE_LINES_INDIVIDUALLY);
+
+ runner.setProperty(ValidateCsv.SCHEMA, "Unique()");
+
+ runner.enqueue("John\r\nBob\r\nBob\r\nJohn");
+ runner.run();
+
+ runner.assertTransferCount(ValidateCsv.REL_VALID, 1);
+ runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+
+ runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals("John\r\nBob");
+ runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertContentEquals("Bob\r\nJohn");
+ }
+
+ @Test
+ public void testValidDateOptionalDouble() {
+ final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+ runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+ runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+ runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+ runner.setProperty(ValidateCsv.HEADER, "false");
+
+ runner.setProperty(ValidateCsv.SCHEMA, "Null, ParseDate(\"dd/MM/yyyy\"), Optional(ParseDouble())");
+
+ runner.enqueue("John,22/11/1954,63.2\r\nBob,01/03/2004,45.0");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
+
+ runner.enqueue("John,22/111954,63.2\r\nBob,01/03/2004,45.0");
+ runner.run();
+ runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+ }
+
+ @Test
+ public void testIsIncludedIn() {
+ final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+ runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+ runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+ runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+ runner.setProperty(ValidateCsv.HEADER, "false");
+
+ runner.setProperty(ValidateCsv.SCHEMA, "Null, ParseDate(\"dd/MM/yyyy\"), IsIncludedIn(\"male\", \"female\")");
+
+ runner.enqueue("John,22/11/1954,male\r\nMarie,01/03/2004,female");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
+
+ runner.enqueue("John,22/111954,63.2\r\nBob,01/03/2004,45.0");
+ runner.run();
+ runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+ }
+
+ @Test
+ public void testBigDecimalBoolCharIntLong() {
+ final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+ runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+ runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+ runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+ runner.setProperty(ValidateCsv.HEADER, "true");
+
+ runner.setProperty(ValidateCsv.SCHEMA, "ParseBigDecimal(), ParseBool(), ParseChar(), ParseInt(), ParseLong()");
+
+ runner.enqueue("bigdecimal,bool,char,integer,long\r\n10.0001,true,c,1,92147483647");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
+
+ runner.enqueue("bigdecimal,bool,char,integer,long\r\n10.0001,true,c,92147483647,92147483647");
+ runner.run();
+ runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+ }
+
+ @Test
+ public void testEqualsNotNullStrNotNullOrEmpty() {
+ final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+ runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+ runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+ runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+ runner.setProperty(ValidateCsv.HEADER, "false");
+
+ runner.setProperty(ValidateCsv.SCHEMA, "Equals(), NotNull(), StrNotNullOrEmpty()");
+
+ runner.enqueue("test,test,test\r\ntest,test,test");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
+
+ runner.enqueue("test,test,test\r\ntset,test,test");
+ runner.run();
+ runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+ }
+
+ @Test
+ public void testStrlenStrMinMaxStrRegex() {
+ final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+ runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+ runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+ runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+ runner.setProperty(ValidateCsv.HEADER, "false");
+
+ runner.setProperty(ValidateCsv.SCHEMA, "Strlen(4), StrMinMax(3,5), StrRegex(\"[a-z0-9\\._]+@[a-z0-9\\.]+\")");
+
+ runner.enqueue("test,test,test@apache.org");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
+
+ runner.enqueue("test,test,testapache.org");
+ runner.run();
+ runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+ }
+
+ @Test
+ public void testDMinMaxForbidSubStrLMinMax() {
+ final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+ runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+ runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+ runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+ runner.setProperty(ValidateCsv.HEADER, "false");
+
+ runner.setProperty(ValidateCsv.SCHEMA, "DMinMax(10,100),LMinMax(10,100),ForbidSubStr(\"test\", \"tset\")");
+
+ runner.enqueue("50.001,50,hello");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
+
+ runner.enqueue("10,10,testapache.org");
+ runner.run();
+ runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+ }
+
+ @Test
+ public void testUnique() {
+ final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+ runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+ runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+ runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+ runner.setProperty(ValidateCsv.HEADER, "false");
+
+ runner.setProperty(ValidateCsv.SCHEMA, "Unique(), UniqueHashCode()");
+
+ runner.enqueue("1,2\r\n3,4");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
+
+ runner.enqueue("1,2\r\n1,4");
+ runner.run();
+ runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+ }
+
+ @Test
+ public void testRequire() {
+ final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+ runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+ runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+ runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+ runner.setProperty(ValidateCsv.HEADER, "false");
+
+ int hashcode = "test".hashCode();
+ runner.setProperty(ValidateCsv.SCHEMA, "RequireHashCode(" + hashcode + "), RequireSubStr(\"test\")");
+
+ runner.enqueue("test,test");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
+
+ runner.enqueue("tset,tset");
+ runner.run();
+ runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+ }
+
+ @Test
+ public void testValidate() {
+ final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
+ runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+ runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
+ runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+ runner.setProperty(ValidateCsv.HEADER, "false");
+
+ runner.setProperty(ValidateCsv.SCHEMA, "RequireSubString(\"test\")");
+ runner.assertNotValid();
+
+ runner.setProperty(ValidateCsv.SCHEMA, "''");
+ runner.assertNotValid();
+
+ runner.setProperty(ValidateCsv.SCHEMA, "\"\"");
+ runner.assertNotValid();
+ }
+
+}