You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by pvillard31 <gi...@git.apache.org> on 2016/05/30 21:18:31 UTC

[GitHub] nifi pull request: NIFI-1942 Processor to validate CSV against use...

GitHub user pvillard31 opened a pull request:

    https://github.com/apache/nifi/pull/476

    NIFI-1942 Processor to validate CSV against user-supplied schema

    This processor is designed to validate a CSV formatted FlowFile against a user-supplied schema.
    
    It leverages Cell Processors from super-csv library and gives the following options to define the expected schema:
    
    - ParseBigDecimal
    - ParseBool
    - ParseChar
    - ParseDate
    - ParseDouble
    - ParseInt
    - ParseLong
    - Optional
    - DMinMax
    - Equals
    - ForbidSubStr
    - LMinMax
    - NotNull
    - Null
    - RequireHashCode
    - RequireSubStr
    - Strlen
    - StrMinMax
    - StrNotNullOrEmpty
    - StrRegEx
    - Unique
    - UniqueHashCode
    
    Nested cell processors are not supported except with Optional.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/pvillard31/nifi validate-csv

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/476.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #476
    
----
commit 30b8ad2e3dddbad07ce39e54db78f9426aed6001
Author: Pierre Villard <pi...@gmail.com>
Date:   2016-05-27T08:05:16Z

    NIFI-1942 Processor to validate CSV against user-supplied schema

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #476: NIFI-1942 Processor to validate CSV against user-sup...

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/476#discussion_r71088067
  
    --- Diff: nifi-assembly/NOTICE ---
    @@ -193,6 +193,8 @@ The following binary components are provided under the Apache Software License v
     
       (ASLv2) opencsv (net.sf.opencsv:opencsv:2.3)
     
    +  (ASLv2) Super CSV (net.sf.supercsv:super-csv:2.4.0)
    --- End diff --
    
    This is a Apache 2.0 licensed import with not NOTICE, it can be used with adding anything to the NOTICE or LICENSE file. This applies to this assembly NOTICE and the nar NOTICE.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #476: NIFI-1942 Processor to validate CSV against user-sup...

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/476#discussion_r70716978
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java ---
    @@ -0,0 +1,408 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.supercsv.cellprocessor.Optional;
    +import org.supercsv.cellprocessor.ParseBigDecimal;
    +import org.supercsv.cellprocessor.ParseBool;
    +import org.supercsv.cellprocessor.ParseChar;
    +import org.supercsv.cellprocessor.ParseDate;
    +import org.supercsv.cellprocessor.ParseDouble;
    +import org.supercsv.cellprocessor.ParseInt;
    +import org.supercsv.cellprocessor.ParseLong;
    +import org.supercsv.cellprocessor.constraint.DMinMax;
    +import org.supercsv.cellprocessor.constraint.Equals;
    +import org.supercsv.cellprocessor.constraint.ForbidSubStr;
    +import org.supercsv.cellprocessor.constraint.LMinMax;
    +import org.supercsv.cellprocessor.constraint.NotNull;
    +import org.supercsv.cellprocessor.constraint.RequireHashCode;
    +import org.supercsv.cellprocessor.constraint.RequireSubStr;
    +import org.supercsv.cellprocessor.constraint.StrMinMax;
    +import org.supercsv.cellprocessor.constraint.StrNotNullOrEmpty;
    +import org.supercsv.cellprocessor.constraint.StrRegEx;
    +import org.supercsv.cellprocessor.constraint.Strlen;
    +import org.supercsv.cellprocessor.constraint.Unique;
    +import org.supercsv.cellprocessor.constraint.UniqueHashCode;
    +import org.supercsv.cellprocessor.ift.CellProcessor;
    +import org.supercsv.exception.SuperCsvCellProcessorException;
    +import org.supercsv.io.CsvListReader;
    +import org.supercsv.prefs.CsvPreference;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"csv", "schema", "validation"})
    +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified CSV schema")
    +public class ValidateCsv extends AbstractProcessor {
    +
    +    private final static List<String> allowedOperators = Arrays.asList("ParseBigDecimal", "ParseBool", "ParseChar", "ParseDate",
    +        "ParseDouble", "ParseInt", "ParseLong", "Optional", "DMinMax", "Equals", "ForbidSubStr", "LMinMax", "NotNull", "Null",
    +        "RequireHashCode", "RequireSubStr", "Strlen", "StrMinMax", "StrNotNullOrEmpty", "StrRegEx", "Unique",
    +        "UniqueHashCode");
    +
    +    public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
    +            .name("validate-csv-schema")
    +            .displayName("Schema")
    +            .description("The schema to be used for validation. Is expected a comma-delimited string representing" +
    +                    "the cell processors to apply. The following cell processors are allowed in the schema definition: " +
    +                    allowedOperators.toString() + ". Note: cell processors cannot be nested except with Optional.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor HEADER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-header")
    +            .displayName("Header")
    +            .description("True if the incoming flow file contains a header to ignore, false otherwise.")
    +            .required(true)
    +            .defaultValue("true")
    +            .allowableValues("true", "false")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor QUOTE_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-quote")
    +            .displayName("Quote character")
    +            .description("Character used as 'quote' in the incoming data. Example: \"")
    +            .required(true)
    +            .defaultValue("\"")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor DELIMITER_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-delimiter")
    +            .displayName("Delimiter character")
    +            .description("Character used as 'delimiter' in the incoming data. Example: ;")
    +            .required(true)
    +            .defaultValue(";")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor END_OF_LINE_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-eol")
    +            .displayName("End of line symbols")
    +            .description("Symbols used as 'end of line' in the incoming data. Example: \\n")
    +            .required(true)
    +            .defaultValue("\\n")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_VALID = new Relationship.Builder()
    +            .name("valid")
    +            .description("FlowFiles that are successfully validated against the schema are routed to this relationship")
    +            .build();
    +    public static final Relationship REL_INVALID = new Relationship.Builder()
    +            .name("invalid")
    +            .description("FlowFiles that are not valid according to the specified schema are routed to this relationship")
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +    private final AtomicReference<List<CellProcessor>> processors = new AtomicReference<List<CellProcessor>>();
    +    private final AtomicReference<CsvPreference> preference = new AtomicReference<CsvPreference>();
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(SCHEMA);
    +        properties.add(HEADER);
    +        properties.add(DELIMITER_CHARACTER);
    +        properties.add(QUOTE_CHARACTER);
    +        properties.add(END_OF_LINE_CHARACTER);
    +        this.properties = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_VALID);
    +        relationships.add(REL_INVALID);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    +        String schema = validationContext.getProperty(SCHEMA).getValue();
    +        try {
    +            this.parseSchema(validationContext.getProperty(SCHEMA).getValue());
    +        } catch (Exception e) {
    +            final List<ValidationResult> problems = new ArrayList<>(1);
    +            problems.add(new ValidationResult.Builder().subject(SCHEMA.getName())
    +                    .input(schema)
    +                    .valid(false)
    +                    .explanation("Error while parsing the schema: " + e.getMessage())
    +                    .build());
    +            return problems;
    +        }
    +        return super.customValidate(validationContext);
    +    }
    +
    +    @OnScheduled
    +    public void setPreference(final ProcessContext context) {
    +        this.preference.set(new CsvPreference.Builder(context.getProperty(QUOTE_CHARACTER).getValue().charAt(0),
    +                context.getProperty(DELIMITER_CHARACTER).getValue().charAt(0),
    +                context.getProperty(END_OF_LINE_CHARACTER).getValue()).build());
    +    }
    +
    +    /**
    +     * Method used to parse the string supplied by the user. The string is converted
    +     * to a list of cell processors used to validate the CSV data.
    +     * @param schema Schema to parse
    +     */
    +    private void parseSchema(String schema) {
    +        this.processors.set(new ArrayList<CellProcessor>());
    +        String remaining = schema;
    +        while(remaining.length() > 0) {
    +            remaining = setProcessor(remaining);
    +        }
    +    }
    +
    +    private String setProcessor(String remaining) {
    +        StringBuffer buffer = new StringBuffer();
    +        int i = 0;
    +        int opening = 0;
    +        int closing = 0;
    +        while(buffer.length() != remaining.length()) {
    +            char c = remaining.charAt(i);
    +            i++;
    +
    +            if(opening == 0 && c == ',') {
    +                if(i == 1) {
    +                    continue;
    +                }
    +                break;
    +            }
    +
    +            buffer.append(c);
    +
    +            if(c == '(') {
    +                opening++;
    +            } else if(c == ')') {
    +                closing++;
    +            }
    +
    +            if(opening > 0 && opening == closing) {
    +                break;
    +            }
    +        }
    +
    +        final String procString = buffer.toString().trim();
    +        opening = procString.indexOf('(');
    +        String method = procString;
    +        String argument = null;
    +        if(opening != -1) {
    +            argument = method.substring(opening + 1, method.length() - 1);
    +            method = method.substring(0, opening);
    +        }
    +
    +        this.processors.get().add(getProcessor(method.toLowerCase(), argument));
    +
    +        return remaining.substring(i);
    +    }
    +
    +    private CellProcessor getProcessor(String method, String argument) {
    +        switch (method) {
    +
    +        case "optional":
    +            int opening = argument.indexOf('(');
    +            String subMethod = argument;
    +            String subArgument = null;
    +            if(opening != -1) {
    +                subArgument = subMethod.substring(opening + 1, subMethod.length() - 1);
    +                subMethod = subMethod.substring(0, opening);
    +            }
    +            return new Optional(getProcessor(subMethod.toLowerCase(), subArgument));
    +
    +        case "parsedate":
    +            return new ParseDate(argument.substring(1, argument.length() - 1));
    +
    +        case "parsedouble":
    +            return new ParseDouble();
    +
    +        case "parsebigdecimal":
    +            return new ParseBigDecimal();
    +
    +        case "parsebool":
    +            return new ParseBool();
    +
    +        case "parsechar":
    +            return new ParseChar();
    +
    +        case "parseint":
    +            return new ParseInt();
    +
    +        case "parselong":
    +            return new ParseLong();
    +
    +        case "notnull":
    +            return new NotNull();
    +
    +        case "strregex":
    +            return new StrRegEx(argument.substring(1, argument.length() - 1));
    +
    +        case "unique":
    +            return new Unique();
    +
    +        case "uniquehashcode":
    +            return new UniqueHashCode();
    +
    +        case "strlen":
    +            String[] splts = argument.split(",");
    +            int[] requiredLengths = new int[splts.length];
    +            for(int i = 0; i < splts.length; i++) {
    +                requiredLengths[i] = Integer.parseInt(splts[i]);
    +            }
    +            return new Strlen(requiredLengths);
    +
    +        case "strminmax":
    +            String[] splits = argument.split(",");
    +            return new StrMinMax(Long.parseLong(splits[0]), Long.parseLong(splits[1]));
    +
    +        case "lminmax":
    +            String[] args = argument.split(",");
    +            return new LMinMax(Long.parseLong(args[0]), Long.parseLong(args[1]));
    +
    +        case "dminmax":
    +            String[] doubles = argument.split(",");
    +            return new DMinMax(Double.parseDouble(doubles[0]), Double.parseDouble(doubles[1]));
    +
    +        case "equals":
    +            return new Equals();
    +
    +        case "forbidsubstr":
    +            String[] forbiddenSubStrings = argument.split(",");
    +            for (int i = 0; i < forbiddenSubStrings.length; i++) {
    +                forbiddenSubStrings[i] = forbiddenSubStrings[i].substring(1, forbiddenSubStrings[i].length() - 1);
    +            }
    +            return new ForbidSubStr(forbiddenSubStrings);
    +
    +        case "requiresubstr":
    +            String[] requiredSubStrings = argument.split(",");
    +            for (int i = 0; i < requiredSubStrings.length; i++) {
    +                requiredSubStrings[i] = requiredSubStrings[i].substring(1, requiredSubStrings[i].length() - 1);
    +            }
    +            return new RequireSubStr(requiredSubStrings);
    +
    +        case "strnotnullorempty":
    +            return new StrNotNullOrEmpty();
    +
    +        case "requirehashcode":
    +            String[] hashs = argument.split(",");
    +            int[] hashcodes = new int[hashs.length];
    +            for(int i = 0; i < hashs.length; i++) {
    +                hashcodes[i] = Integer.parseInt(hashs[i]);
    +            }
    +            return new RequireHashCode(hashcodes);
    +
    +        case "null":
    +            return null;
    +
    +        default:
    +            throw new IllegalArgumentException(method + " is not an allowed method to define a Cell Processor");
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    +        final List<FlowFile> flowFiles = session.get(50);
    --- End diff --
    
    I would suggest not doing an internal batching here. Some older processors did this because "@SupportsBatching" wasn't created yet. You already have this annotation so this will allow the user to set the batching duration instead of hiding inside the implementation. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #476: NIFI-1942 Processor to validate CSV against user-sup...

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/476#discussion_r72538224
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java ---
    @@ -0,0 +1,613 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.io.OutputStream;
    +import java.io.Reader;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.supercsv.cellprocessor.Optional;
    +import org.supercsv.cellprocessor.ParseBigDecimal;
    +import org.supercsv.cellprocessor.ParseBool;
    +import org.supercsv.cellprocessor.ParseChar;
    +import org.supercsv.cellprocessor.ParseDate;
    +import org.supercsv.cellprocessor.ParseDouble;
    +import org.supercsv.cellprocessor.ParseInt;
    +import org.supercsv.cellprocessor.ParseLong;
    +import org.supercsv.cellprocessor.constraint.DMinMax;
    +import org.supercsv.cellprocessor.constraint.Equals;
    +import org.supercsv.cellprocessor.constraint.ForbidSubStr;
    +import org.supercsv.cellprocessor.constraint.IsIncludedIn;
    +import org.supercsv.cellprocessor.constraint.LMinMax;
    +import org.supercsv.cellprocessor.constraint.NotNull;
    +import org.supercsv.cellprocessor.constraint.RequireHashCode;
    +import org.supercsv.cellprocessor.constraint.RequireSubStr;
    +import org.supercsv.cellprocessor.constraint.StrMinMax;
    +import org.supercsv.cellprocessor.constraint.StrNotNullOrEmpty;
    +import org.supercsv.cellprocessor.constraint.StrRegEx;
    +import org.supercsv.cellprocessor.constraint.Strlen;
    +import org.supercsv.cellprocessor.constraint.Unique;
    +import org.supercsv.cellprocessor.constraint.UniqueHashCode;
    +import org.supercsv.cellprocessor.ift.CellProcessor;
    +import org.supercsv.exception.SuperCsvCellProcessorException;
    +import org.supercsv.io.CsvListReader;
    +import org.supercsv.prefs.CsvPreference;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"csv", "schema", "validation"})
    +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified CSV schema. " +
    +        "Take a look at the additional documentation of this processor for some schema examples.")
    +public class ValidateCsv extends AbstractProcessor {
    +
    +    private final static List<String> allowedOperators = Arrays.asList("ParseBigDecimal", "ParseBool", "ParseChar", "ParseDate",
    +            "ParseDouble", "ParseInt", "ParseLong", "Optional", "DMinMax", "Equals", "ForbidSubStr", "LMinMax", "NotNull", "Null",
    +            "RequireHashCode", "RequireSubStr", "Strlen", "StrMinMax", "StrNotNullOrEmpty", "StrRegEx", "Unique",
    +            "UniqueHashCode", "IsIncludedIn");
    +
    +    private static final String routeWholeFlowFile = "Route to 'valid' the whole FlowFile if the CSV is valid";
    +    private static final String routeLinesIndividually = "Route to 'valid' a FlowFile containing the valid lines and"
    +            + " to 'invalid' a FlowFile containing the invalid lines";
    +
    +    public static final AllowableValue VALIDATE_WHOLE_FLOWFILE = new AllowableValue(routeWholeFlowFile, routeWholeFlowFile,
    +            "In case an error is found in the CSV file, the whole flow file will be routed to the 'invalid' relationship. "
    +                    + "This option offers best performances.");
    +
    +    public static final AllowableValue VALIDATE_LINES_INDIVIDUALLY = new AllowableValue(routeLinesIndividually, routeLinesIndividually,
    +            "In case an error is found, the input CSV file will be split into two FlowFiles: one routed to the 'valid' "
    +                    + "relationship containing all the correct lines and one routed to the 'invalid' relationship containing all "
    +                    + "the incorrect lines. Take care if choosing this option while using Unique cell processors in schema definition.");
    +
    +    public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
    +            .name("validate-csv-schema")
    +            .displayName("Schema")
    +            .description("The schema to be used for validation. Is expected a comma-delimited string representing the cell "
    +                    + "processors to apply. The following cell processors are allowed in the schema definition: "
    +                    + allowedOperators.toString() + ". Note: cell processors cannot be nested except with Optional.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor HEADER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-header")
    +            .displayName("Header")
    +            .description("True if the incoming flow file contains a header to ignore, false otherwise.")
    +            .required(true)
    +            .defaultValue("true")
    +            .allowableValues("true", "false")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor QUOTE_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-quote")
    +            .displayName("Quote character")
    +            .description("Character used as 'quote' in the incoming data. Example: \"")
    +            .required(true)
    +            .defaultValue("\"")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor DELIMITER_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-delimiter")
    +            .displayName("Delimiter character")
    +            .description("Character used as 'delimiter' in the incoming data. Example: ,")
    +            .required(true)
    +            .defaultValue(",")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor END_OF_LINE_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-eol")
    +            .displayName("End of line symbols")
    +            .description("Symbols used as 'end of line' in the incoming data. Example: \\n")
    +            .required(true)
    +            .defaultValue("\\n")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor VALIDATION_STRATEGY = new PropertyDescriptor.Builder()
    +            .name("validate-csv-strategy")
    +            .displayName("Validation strategy")
    +            .description("Strategy to apply when routing input files to output relationships.")
    +            .required(true)
    +            .defaultValue(VALIDATE_WHOLE_FLOWFILE.getValue())
    +            .allowableValues(VALIDATE_LINES_INDIVIDUALLY, VALIDATE_WHOLE_FLOWFILE)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_VALID = new Relationship.Builder()
    +            .name("valid")
    +            .description("FlowFiles that are successfully validated against the schema are routed to this relationship")
    +            .build();
    +    public static final Relationship REL_INVALID = new Relationship.Builder()
    +            .name("invalid")
    +            .description("FlowFiles that are not valid according to the specified schema are routed to this relationship")
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +    private final AtomicReference<CellProcessor[]> processors = new AtomicReference<CellProcessor[]>();
    +    private final AtomicReference<CsvPreference> preference = new AtomicReference<CsvPreference>();
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(SCHEMA);
    +        properties.add(HEADER);
    +        properties.add(DELIMITER_CHARACTER);
    +        properties.add(QUOTE_CHARACTER);
    +        properties.add(END_OF_LINE_CHARACTER);
    +        properties.add(VALIDATION_STRATEGY);
    +        this.properties = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_VALID);
    +        relationships.add(REL_INVALID);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    +        String schema = validationContext.getProperty(SCHEMA).getValue();
    +        try {
    +            this.parseSchema(validationContext.getProperty(SCHEMA).getValue());
    +        } catch (Exception e) {
    +            final List<ValidationResult> problems = new ArrayList<>(1);
    +            problems.add(new ValidationResult.Builder().subject(SCHEMA.getName())
    +                    .input(schema)
    +                    .valid(false)
    +                    .explanation("Error while parsing the schema: " + e.getMessage())
    +                    .build());
    +            return problems;
    +        }
    +        return super.customValidate(validationContext);
    +    }
    +
    +    @OnScheduled
    +    public void setPreference(final ProcessContext context) {
    +        this.preference.set(new CsvPreference.Builder(context.getProperty(QUOTE_CHARACTER).getValue().charAt(0),
    +                context.getProperty(DELIMITER_CHARACTER).getValue().charAt(0),
    +                context.getProperty(END_OF_LINE_CHARACTER).getValue()).build());
    +    }
    +
    +    /**
    +     * Method used to parse the string supplied by the user. The string is converted
    +     * to a list of cell processors used to validate the CSV data.
    +     * @param schema Schema to parse
    +     */
    +    private void parseSchema(String schema) {
    +        List<CellProcessor> processorsList = new ArrayList<CellProcessor>();
    +
    +        String remaining = schema;
    +        while(remaining.length() > 0) {
    +            remaining = setProcessor(remaining, processorsList);
    +        }
    +
    +        this.processors.set(processorsList.toArray(new CellProcessor[processorsList.size()]));
    +    }
    +
    +    private String setProcessor(String remaining, List<CellProcessor> processorsList) {
    +        StringBuffer buffer = new StringBuffer();
    +        int i = 0;
    +        int opening = 0;
    +        int closing = 0;
    +        while(buffer.length() != remaining.length()) {
    +            char c = remaining.charAt(i);
    +            i++;
    +
    +            if(opening == 0 && c == ',') {
    +                if(i == 1) {
    +                    continue;
    +                }
    +                break;
    +            }
    +
    +            buffer.append(c);
    +
    +            if(c == '(') {
    +                opening++;
    +            } else if(c == ')') {
    +                closing++;
    +            }
    +
    +            if(opening > 0 && opening == closing) {
    +                break;
    +            }
    +        }
    +
    +        final String procString = buffer.toString().trim();
    +        opening = procString.indexOf('(');
    +        String method = procString;
    +        String argument = null;
    +        if(opening != -1) {
    +            argument = method.substring(opening + 1, method.length() - 1);
    +            method = method.substring(0, opening);
    +        }
    +
    +        processorsList.add(getProcessor(method.toLowerCase(), argument));
    +
    +        return remaining.substring(i);
    +    }
    +
    +    private CellProcessor getProcessor(String method, String argument) {
    +        switch (method) {
    +
    +            case "optional":
    +                int opening = argument.indexOf('(');
    +                String subMethod = argument;
    +                String subArgument = null;
    +                if(opening != -1) {
    +                    subArgument = subMethod.substring(opening + 1, subMethod.length() - 1);
    +                    subMethod = subMethod.substring(0, opening);
    +                }
    +                return new Optional(getProcessor(subMethod.toLowerCase(), subArgument));
    +
    +            case "parsedate":
    +                return new ParseDate(argument.substring(1, argument.length() - 1));
    +
    +            case "parsedouble":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseDouble does not expect any argument but has " + argument);
    +                return new ParseDouble();
    +
    +            case "parsebigdecimal":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseBigDecimal does not expect any argument but has " + argument);
    +                return new ParseBigDecimal();
    +
    +            case "parsebool":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseBool does not expect any argument but has " + argument);
    +                return new ParseBool();
    +
    +            case "parsechar":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseChar does not expect any argument but has " + argument);
    +                return new ParseChar();
    +
    +            case "parseint":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseInt does not expect any argument but has " + argument);
    +                return new ParseInt();
    +
    +            case "parselong":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseLong does not expect any argument but has " + argument);
    +                return new ParseLong();
    +
    +            case "notnull":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("NotNull does not expect any argument but has " + argument);
    +                return new NotNull();
    +
    +            case "strregex":
    +                return new StrRegEx(argument.substring(1, argument.length() - 1));
    +
    +            case "unique":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("Unique does not expect any argument but has " + argument);
    +                return new Unique();
    +
    +            case "uniquehashcode":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("UniqueHashCode does not expect any argument but has " + argument);
    +                return new UniqueHashCode();
    +
    +            case "strlen":
    +                String[] splts = argument.split(",");
    +                int[] requiredLengths = new int[splts.length];
    +                for(int i = 0; i < splts.length; i++) {
    +                    requiredLengths[i] = Integer.parseInt(splts[i]);
    +                }
    +                return new Strlen(requiredLengths);
    +
    +            case "strminmax":
    +                String[] splits = argument.split(",");
    +                return new StrMinMax(Long.parseLong(splits[0]), Long.parseLong(splits[1]));
    +
    +            case "lminmax":
    +                String[] args = argument.split(",");
    +                return new LMinMax(Long.parseLong(args[0]), Long.parseLong(args[1]));
    +
    +            case "dminmax":
    +                String[] doubles = argument.split(",");
    +                return new DMinMax(Double.parseDouble(doubles[0]), Double.parseDouble(doubles[1]));
    +
    +            case "equals":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("Equals does not expect any argument but has " + argument);
    +                return new Equals();
    +
    +            case "forbidsubstr":
    +                String[] forbiddenSubStrings = argument.replaceAll("\"", "").split(",[ ]*");
    +                return new ForbidSubStr(forbiddenSubStrings);
    +
    +            case "requiresubstr":
    +                String[] requiredSubStrings = argument.replaceAll("\"", "").split(",[ ]*");
    +                return new RequireSubStr(requiredSubStrings);
    +
    +            case "strnotnullorempty":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("StrNotNullOrEmpty does not expect any argument but has " + argument);
    +                return new StrNotNullOrEmpty();
    +
    +            case "requirehashcode":
    +                String[] hashs = argument.split(",");
    +                int[] hashcodes = new int[hashs.length];
    +                for(int i = 0; i < hashs.length; i++) {
    +                    hashcodes[i] = Integer.parseInt(hashs[i]);
    +                }
    +                return new RequireHashCode(hashcodes);
    +
    +            case "null":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("Null does not expect any argument but has " + argument);
    +                return null;
    +
    +            case "isincludedin":
    +                String[] elements = argument.replaceAll("\"", "").split(",[ ]*");
    +                return new IsIncludedIn(elements);
    +
    +            default:
    +                throw new IllegalArgumentException("[" + method + "] is not an allowed method to define a Cell Processor");
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final CsvPreference csvPref = this.preference.get();
    +        final boolean header = context.getProperty(HEADER).asBoolean();
    +        final ComponentLog logger = getLogger();
    +        final CellProcessor[] cellProcs = this.processors.get();
    +        final boolean isWholeFFValidation = context.getProperty(VALIDATION_STRATEGY).getValue().equals(VALIDATE_WHOLE_FLOWFILE.getValue());
    +
    +        final AtomicReference<Boolean> valid = new AtomicReference<Boolean>(true);
    +        final AtomicReference<Boolean> isFirstLine = new AtomicReference<Boolean>(true);
    +        final AtomicReference<Integer> okCount = new AtomicReference<Integer>(0);
    +        final AtomicReference<Integer> totalCount = new AtomicReference<Integer>(0);
    +        final AtomicReference<FlowFile> invalidFF = new AtomicReference<FlowFile>(null);
    +        final AtomicReference<FlowFile> validFF = new AtomicReference<FlowFile>(null);
    +
    +        if(!isWholeFFValidation) {
    +            invalidFF.set(session.create(flowFile));
    +            validFF.set(session.create(flowFile));
    +        }
    +
    +        session.read(flowFile, new InputStreamCallback() {
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                NifiCsvListReader listReader = null;
    +                try {
    +                    listReader = new NifiCsvListReader(new InputStreamReader(in), csvPref);
    +
    +                    // handling of header
    +                    if(header) {
    +                        List<String> headerList = listReader.read();
    +                        if(!isWholeFFValidation) {
    +                            invalidFF.set(session.append(invalidFF.get(), new OutputStreamCallback() {
    +                                @Override
    +                                public void process(OutputStream out) throws IOException {
    +                                    out.write(print(headerList, csvPref, isFirstLine.get()));
    +                                }
    +                            }));
    +                            validFF.set(session.append(validFF.get(), new OutputStreamCallback() {
    +                                @Override
    +                                public void process(OutputStream out) throws IOException {
    +                                    out.write(print(headerList, csvPref, isFirstLine.get()));
    +                                }
    +                            }));
    +                            isFirstLine.set(false);
    +                        }
    +                    }
    +
    +                    boolean stop = false;
    +
    +                    while (!stop) {
    +                        try {
    +
    +                            final List<Object> list = listReader.read(cellProcs);
    +                            stop = list == null;
    +
    +                            if(!isWholeFFValidation && !stop) {
    +                                validFF.set(session.append(validFF.get(), new OutputStreamCallback() {
    +                                    @Override
    +                                    public void process(OutputStream out) throws IOException {
    +                                        out.write(print(list, csvPref, isFirstLine.get()));
    +                                    }
    +                                }));
    +                                okCount.set(okCount.get() + 1);
    +
    +                                if(isFirstLine.get()) {
    +                                    isFirstLine.set(false);
    +                                }
    +                            }
    +
    +                        } catch (final SuperCsvCellProcessorException e) {
    +                            valid.set(false);
    +                            if(isWholeFFValidation) {
    +                                logger.debug("Failed to validate {} against schema due to {}; routing to 'invalid'", new Object[]{flowFile}, e);
    +                                break;
    +                            } else {
    +                                // we append the invalid line to the flow file that will be routed to invalid relationship
    +                                invalidFF.set(session.append(invalidFF.get(), new OutputStreamCallback() {
    +                                    @Override
    +                                    public void process(OutputStream out) throws IOException {
    +                                        out.write(print(e.getCsvContext().getRowSource(), csvPref, isFirstLine.get()));
    +                                    }
    +                                }));
    +
    +                                if(isFirstLine.get()) {
    +                                    isFirstLine.set(false);
    +                                }
    +                            }
    +                        } finally {
    +                            if(!isWholeFFValidation) {
    +                                totalCount.set(totalCount.get() + 1);
    +                            }
    +                        }
    +                    }
    +
    +                } catch (final IOException e) {
    +                    valid.set(false);
    +                    logger.error("Failed to validate {} against schema due to {}", new Object[]{flowFile}, e);
    +                } finally {
    +                    if(listReader != null) {
    +                        listReader.close();
    +                    }
    +                }
    +            }
    +        });
    +
    +        if(isWholeFFValidation) {
    +            if (valid.get()) {
    +                logger.info("Successfully validated {} against schema; routing to 'valid'", new Object[]{flowFile});
    +                session.getProvenanceReporter().route(flowFile, REL_VALID);
    +                session.transfer(flowFile, REL_VALID);
    +            } else {
    +                session.getProvenanceReporter().route(flowFile, REL_INVALID);
    +                session.transfer(flowFile, REL_INVALID);
    +            }
    +        } else {
    +            if (valid.get()) {
    +                logger.debug("Successfully validated {} against schema; routing to 'valid'", new Object[]{validFF.get()});
    +                session.getProvenanceReporter().route(validFF.get(), REL_VALID, "All " + totalCount.get() + " line(s) are valid");
    +                session.transfer(validFF.get(), REL_VALID);
    +                session.remove(invalidFF.get());
    +                session.remove(flowFile);
    +            } else if (okCount.get() != 0) {
    +                if(header) {
    +                    totalCount.set(totalCount.get() - 1);
    --- End diff --
    
    This gets decremented here but, I may just be missing it, I don't see the Header add to the totalCount.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #476: NIFI-1942 Processor to validate CSV against user-sup...

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/476#discussion_r70715212
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java ---
    @@ -0,0 +1,408 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.supercsv.cellprocessor.Optional;
    +import org.supercsv.cellprocessor.ParseBigDecimal;
    +import org.supercsv.cellprocessor.ParseBool;
    +import org.supercsv.cellprocessor.ParseChar;
    +import org.supercsv.cellprocessor.ParseDate;
    +import org.supercsv.cellprocessor.ParseDouble;
    +import org.supercsv.cellprocessor.ParseInt;
    +import org.supercsv.cellprocessor.ParseLong;
    +import org.supercsv.cellprocessor.constraint.DMinMax;
    +import org.supercsv.cellprocessor.constraint.Equals;
    +import org.supercsv.cellprocessor.constraint.ForbidSubStr;
    +import org.supercsv.cellprocessor.constraint.LMinMax;
    +import org.supercsv.cellprocessor.constraint.NotNull;
    +import org.supercsv.cellprocessor.constraint.RequireHashCode;
    +import org.supercsv.cellprocessor.constraint.RequireSubStr;
    +import org.supercsv.cellprocessor.constraint.StrMinMax;
    +import org.supercsv.cellprocessor.constraint.StrNotNullOrEmpty;
    +import org.supercsv.cellprocessor.constraint.StrRegEx;
    +import org.supercsv.cellprocessor.constraint.Strlen;
    +import org.supercsv.cellprocessor.constraint.Unique;
    +import org.supercsv.cellprocessor.constraint.UniqueHashCode;
    +import org.supercsv.cellprocessor.ift.CellProcessor;
    +import org.supercsv.exception.SuperCsvCellProcessorException;
    +import org.supercsv.io.CsvListReader;
    +import org.supercsv.prefs.CsvPreference;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"csv", "schema", "validation"})
    +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified CSV schema")
    --- End diff --
    
    Is there some sort of example CSV schema or other documentation we can point to here? I know nothing about CSV validation and if I was given the task to add/configure this processor I would be completely lost. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #476: NIFI-1942 Processor to validate CSV against user-sup...

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/476#discussion_r70718276
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java ---
    @@ -0,0 +1,408 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.supercsv.cellprocessor.Optional;
    +import org.supercsv.cellprocessor.ParseBigDecimal;
    +import org.supercsv.cellprocessor.ParseBool;
    +import org.supercsv.cellprocessor.ParseChar;
    +import org.supercsv.cellprocessor.ParseDate;
    +import org.supercsv.cellprocessor.ParseDouble;
    +import org.supercsv.cellprocessor.ParseInt;
    +import org.supercsv.cellprocessor.ParseLong;
    +import org.supercsv.cellprocessor.constraint.DMinMax;
    +import org.supercsv.cellprocessor.constraint.Equals;
    +import org.supercsv.cellprocessor.constraint.ForbidSubStr;
    +import org.supercsv.cellprocessor.constraint.LMinMax;
    +import org.supercsv.cellprocessor.constraint.NotNull;
    +import org.supercsv.cellprocessor.constraint.RequireHashCode;
    +import org.supercsv.cellprocessor.constraint.RequireSubStr;
    +import org.supercsv.cellprocessor.constraint.StrMinMax;
    +import org.supercsv.cellprocessor.constraint.StrNotNullOrEmpty;
    +import org.supercsv.cellprocessor.constraint.StrRegEx;
    +import org.supercsv.cellprocessor.constraint.Strlen;
    +import org.supercsv.cellprocessor.constraint.Unique;
    +import org.supercsv.cellprocessor.constraint.UniqueHashCode;
    +import org.supercsv.cellprocessor.ift.CellProcessor;
    +import org.supercsv.exception.SuperCsvCellProcessorException;
    +import org.supercsv.io.CsvListReader;
    +import org.supercsv.prefs.CsvPreference;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"csv", "schema", "validation"})
    +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified CSV schema")
    +public class ValidateCsv extends AbstractProcessor {
    +
    +    private final static List<String> allowedOperators = Arrays.asList("ParseBigDecimal", "ParseBool", "ParseChar", "ParseDate",
    +        "ParseDouble", "ParseInt", "ParseLong", "Optional", "DMinMax", "Equals", "ForbidSubStr", "LMinMax", "NotNull", "Null",
    +        "RequireHashCode", "RequireSubStr", "Strlen", "StrMinMax", "StrNotNullOrEmpty", "StrRegEx", "Unique",
    +        "UniqueHashCode");
    +
    +    public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
    +            .name("validate-csv-schema")
    +            .displayName("Schema")
    +            .description("The schema to be used for validation. Is expected a comma-delimited string representing" +
    +                    "the cell processors to apply. The following cell processors are allowed in the schema definition: " +
    +                    allowedOperators.toString() + ". Note: cell processors cannot be nested except with Optional.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor HEADER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-header")
    +            .displayName("Header")
    +            .description("True if the incoming flow file contains a header to ignore, false otherwise.")
    +            .required(true)
    +            .defaultValue("true")
    +            .allowableValues("true", "false")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor QUOTE_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-quote")
    +            .displayName("Quote character")
    +            .description("Character used as 'quote' in the incoming data. Example: \"")
    +            .required(true)
    +            .defaultValue("\"")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor DELIMITER_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-delimiter")
    +            .displayName("Delimiter character")
    +            .description("Character used as 'delimiter' in the incoming data. Example: ;")
    +            .required(true)
    +            .defaultValue(";")
    --- End diff --
    
    Shouldn't the default delimiter for a "Comma Separated Values" file be a comma?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #476: NIFI-1942 Processor to validate CSV against user-sup...

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/476#discussion_r72540382
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java ---
    @@ -0,0 +1,613 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.io.OutputStream;
    +import java.io.Reader;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.supercsv.cellprocessor.Optional;
    +import org.supercsv.cellprocessor.ParseBigDecimal;
    +import org.supercsv.cellprocessor.ParseBool;
    +import org.supercsv.cellprocessor.ParseChar;
    +import org.supercsv.cellprocessor.ParseDate;
    +import org.supercsv.cellprocessor.ParseDouble;
    +import org.supercsv.cellprocessor.ParseInt;
    +import org.supercsv.cellprocessor.ParseLong;
    +import org.supercsv.cellprocessor.constraint.DMinMax;
    +import org.supercsv.cellprocessor.constraint.Equals;
    +import org.supercsv.cellprocessor.constraint.ForbidSubStr;
    +import org.supercsv.cellprocessor.constraint.IsIncludedIn;
    +import org.supercsv.cellprocessor.constraint.LMinMax;
    +import org.supercsv.cellprocessor.constraint.NotNull;
    +import org.supercsv.cellprocessor.constraint.RequireHashCode;
    +import org.supercsv.cellprocessor.constraint.RequireSubStr;
    +import org.supercsv.cellprocessor.constraint.StrMinMax;
    +import org.supercsv.cellprocessor.constraint.StrNotNullOrEmpty;
    +import org.supercsv.cellprocessor.constraint.StrRegEx;
    +import org.supercsv.cellprocessor.constraint.Strlen;
    +import org.supercsv.cellprocessor.constraint.Unique;
    +import org.supercsv.cellprocessor.constraint.UniqueHashCode;
    +import org.supercsv.cellprocessor.ift.CellProcessor;
    +import org.supercsv.exception.SuperCsvCellProcessorException;
    +import org.supercsv.io.CsvListReader;
    +import org.supercsv.prefs.CsvPreference;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"csv", "schema", "validation"})
    +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified CSV schema. " +
    +        "Take a look at the additional documentation of this processor for some schema examples.")
    +public class ValidateCsv extends AbstractProcessor {
    +
    +    private final static List<String> allowedOperators = Arrays.asList("ParseBigDecimal", "ParseBool", "ParseChar", "ParseDate",
    +            "ParseDouble", "ParseInt", "ParseLong", "Optional", "DMinMax", "Equals", "ForbidSubStr", "LMinMax", "NotNull", "Null",
    +            "RequireHashCode", "RequireSubStr", "Strlen", "StrMinMax", "StrNotNullOrEmpty", "StrRegEx", "Unique",
    +            "UniqueHashCode", "IsIncludedIn");
    +
    +    private static final String routeWholeFlowFile = "Route to 'valid' the whole FlowFile if the CSV is valid";
    +    private static final String routeLinesIndividually = "Route to 'valid' a FlowFile containing the valid lines and"
    +            + " to 'invalid' a FlowFile containing the invalid lines";
    +
    +    public static final AllowableValue VALIDATE_WHOLE_FLOWFILE = new AllowableValue(routeWholeFlowFile, routeWholeFlowFile,
    +            "In case an error is found in the CSV file, the whole flow file will be routed to the 'invalid' relationship. "
    +                    + "This option offers best performances.");
    +
    +    public static final AllowableValue VALIDATE_LINES_INDIVIDUALLY = new AllowableValue(routeLinesIndividually, routeLinesIndividually,
    +            "In case an error is found, the input CSV file will be split into two FlowFiles: one routed to the 'valid' "
    +                    + "relationship containing all the correct lines and one routed to the 'invalid' relationship containing all "
    +                    + "the incorrect lines. Take care if choosing this option while using Unique cell processors in schema definition.");
    +
    +    public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
    +            .name("validate-csv-schema")
    +            .displayName("Schema")
    +            .description("The schema to be used for validation. Is expected a comma-delimited string representing the cell "
    +                    + "processors to apply. The following cell processors are allowed in the schema definition: "
    +                    + allowedOperators.toString() + ". Note: cell processors cannot be nested except with Optional.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor HEADER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-header")
    +            .displayName("Header")
    +            .description("True if the incoming flow file contains a header to ignore, false otherwise.")
    +            .required(true)
    +            .defaultValue("true")
    +            .allowableValues("true", "false")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor QUOTE_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-quote")
    +            .displayName("Quote character")
    +            .description("Character used as 'quote' in the incoming data. Example: \"")
    +            .required(true)
    +            .defaultValue("\"")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor DELIMITER_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-delimiter")
    +            .displayName("Delimiter character")
    +            .description("Character used as 'delimiter' in the incoming data. Example: ,")
    +            .required(true)
    +            .defaultValue(",")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor END_OF_LINE_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-eol")
    +            .displayName("End of line symbols")
    +            .description("Symbols used as 'end of line' in the incoming data. Example: \\n")
    +            .required(true)
    +            .defaultValue("\\n")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor VALIDATION_STRATEGY = new PropertyDescriptor.Builder()
    +            .name("validate-csv-strategy")
    +            .displayName("Validation strategy")
    +            .description("Strategy to apply when routing input files to output relationships.")
    +            .required(true)
    +            .defaultValue(VALIDATE_WHOLE_FLOWFILE.getValue())
    +            .allowableValues(VALIDATE_LINES_INDIVIDUALLY, VALIDATE_WHOLE_FLOWFILE)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_VALID = new Relationship.Builder()
    +            .name("valid")
    +            .description("FlowFiles that are successfully validated against the schema are routed to this relationship")
    +            .build();
    +    public static final Relationship REL_INVALID = new Relationship.Builder()
    +            .name("invalid")
    +            .description("FlowFiles that are not valid according to the specified schema are routed to this relationship")
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +    private final AtomicReference<CellProcessor[]> processors = new AtomicReference<CellProcessor[]>();
    +    private final AtomicReference<CsvPreference> preference = new AtomicReference<CsvPreference>();
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(SCHEMA);
    +        properties.add(HEADER);
    +        properties.add(DELIMITER_CHARACTER);
    +        properties.add(QUOTE_CHARACTER);
    +        properties.add(END_OF_LINE_CHARACTER);
    +        properties.add(VALIDATION_STRATEGY);
    +        this.properties = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_VALID);
    +        relationships.add(REL_INVALID);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    +        String schema = validationContext.getProperty(SCHEMA).getValue();
    +        try {
    +            this.parseSchema(validationContext.getProperty(SCHEMA).getValue());
    +        } catch (Exception e) {
    +            final List<ValidationResult> problems = new ArrayList<>(1);
    +            problems.add(new ValidationResult.Builder().subject(SCHEMA.getName())
    +                    .input(schema)
    +                    .valid(false)
    +                    .explanation("Error while parsing the schema: " + e.getMessage())
    +                    .build());
    +            return problems;
    +        }
    +        return super.customValidate(validationContext);
    +    }
    +
    +    @OnScheduled
    +    public void setPreference(final ProcessContext context) {
    +        this.preference.set(new CsvPreference.Builder(context.getProperty(QUOTE_CHARACTER).getValue().charAt(0),
    +                context.getProperty(DELIMITER_CHARACTER).getValue().charAt(0),
    +                context.getProperty(END_OF_LINE_CHARACTER).getValue()).build());
    +    }
    +
    +    /**
    +     * Method used to parse the string supplied by the user. The string is converted
    +     * to a list of cell processors used to validate the CSV data.
    +     * @param schema Schema to parse
    +     */
    +    private void parseSchema(String schema) {
    +        List<CellProcessor> processorsList = new ArrayList<CellProcessor>();
    +
    +        String remaining = schema;
    +        while(remaining.length() > 0) {
    +            remaining = setProcessor(remaining, processorsList);
    +        }
    +
    +        this.processors.set(processorsList.toArray(new CellProcessor[processorsList.size()]));
    +    }
    +
    +    private String setProcessor(String remaining, List<CellProcessor> processorsList) {
    +        StringBuffer buffer = new StringBuffer();
    +        int i = 0;
    +        int opening = 0;
    +        int closing = 0;
    +        while(buffer.length() != remaining.length()) {
    +            char c = remaining.charAt(i);
    +            i++;
    +
    +            if(opening == 0 && c == ',') {
    +                if(i == 1) {
    +                    continue;
    +                }
    +                break;
    +            }
    +
    +            buffer.append(c);
    +
    +            if(c == '(') {
    +                opening++;
    +            } else if(c == ')') {
    +                closing++;
    +            }
    +
    +            if(opening > 0 && opening == closing) {
    +                break;
    +            }
    +        }
    +
    +        final String procString = buffer.toString().trim();
    +        opening = procString.indexOf('(');
    +        String method = procString;
    +        String argument = null;
    +        if(opening != -1) {
    +            argument = method.substring(opening + 1, method.length() - 1);
    +            method = method.substring(0, opening);
    +        }
    +
    +        processorsList.add(getProcessor(method.toLowerCase(), argument));
    +
    +        return remaining.substring(i);
    +    }
    +
    +    private CellProcessor getProcessor(String method, String argument) {
    +        switch (method) {
    +
    +            case "optional":
    +                int opening = argument.indexOf('(');
    +                String subMethod = argument;
    +                String subArgument = null;
    +                if(opening != -1) {
    +                    subArgument = subMethod.substring(opening + 1, subMethod.length() - 1);
    +                    subMethod = subMethod.substring(0, opening);
    +                }
    +                return new Optional(getProcessor(subMethod.toLowerCase(), subArgument));
    +
    +            case "parsedate":
    +                return new ParseDate(argument.substring(1, argument.length() - 1));
    +
    +            case "parsedouble":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseDouble does not expect any argument but has " + argument);
    +                return new ParseDouble();
    +
    +            case "parsebigdecimal":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseBigDecimal does not expect any argument but has " + argument);
    +                return new ParseBigDecimal();
    +
    +            case "parsebool":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseBool does not expect any argument but has " + argument);
    +                return new ParseBool();
    +
    +            case "parsechar":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseChar does not expect any argument but has " + argument);
    +                return new ParseChar();
    +
    +            case "parseint":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseInt does not expect any argument but has " + argument);
    +                return new ParseInt();
    +
    +            case "parselong":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseLong does not expect any argument but has " + argument);
    +                return new ParseLong();
    +
    +            case "notnull":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("NotNull does not expect any argument but has " + argument);
    +                return new NotNull();
    +
    +            case "strregex":
    +                return new StrRegEx(argument.substring(1, argument.length() - 1));
    +
    +            case "unique":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("Unique does not expect any argument but has " + argument);
    +                return new Unique();
    +
    +            case "uniquehashcode":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("UniqueHashCode does not expect any argument but has " + argument);
    +                return new UniqueHashCode();
    +
    +            case "strlen":
    +                String[] splts = argument.split(",");
    +                int[] requiredLengths = new int[splts.length];
    +                for(int i = 0; i < splts.length; i++) {
    +                    requiredLengths[i] = Integer.parseInt(splts[i]);
    +                }
    +                return new Strlen(requiredLengths);
    +
    +            case "strminmax":
    +                String[] splits = argument.split(",");
    +                return new StrMinMax(Long.parseLong(splits[0]), Long.parseLong(splits[1]));
    +
    +            case "lminmax":
    +                String[] args = argument.split(",");
    +                return new LMinMax(Long.parseLong(args[0]), Long.parseLong(args[1]));
    +
    +            case "dminmax":
    +                String[] doubles = argument.split(",");
    +                return new DMinMax(Double.parseDouble(doubles[0]), Double.parseDouble(doubles[1]));
    +
    +            case "equals":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("Equals does not expect any argument but has " + argument);
    +                return new Equals();
    +
    +            case "forbidsubstr":
    +                String[] forbiddenSubStrings = argument.replaceAll("\"", "").split(",[ ]*");
    +                return new ForbidSubStr(forbiddenSubStrings);
    +
    +            case "requiresubstr":
    +                String[] requiredSubStrings = argument.replaceAll("\"", "").split(",[ ]*");
    +                return new RequireSubStr(requiredSubStrings);
    +
    +            case "strnotnullorempty":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("StrNotNullOrEmpty does not expect any argument but has " + argument);
    +                return new StrNotNullOrEmpty();
    +
    +            case "requirehashcode":
    +                String[] hashs = argument.split(",");
    +                int[] hashcodes = new int[hashs.length];
    +                for(int i = 0; i < hashs.length; i++) {
    +                    hashcodes[i] = Integer.parseInt(hashs[i]);
    +                }
    +                return new RequireHashCode(hashcodes);
    +
    +            case "null":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("Null does not expect any argument but has " + argument);
    +                return null;
    +
    +            case "isincludedin":
    +                String[] elements = argument.replaceAll("\"", "").split(",[ ]*");
    +                return new IsIncludedIn(elements);
    +
    +            default:
    +                throw new IllegalArgumentException("[" + method + "] is not an allowed method to define a Cell Processor");
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final CsvPreference csvPref = this.preference.get();
    +        final boolean header = context.getProperty(HEADER).asBoolean();
    +        final ComponentLog logger = getLogger();
    +        final CellProcessor[] cellProcs = this.processors.get();
    +        final boolean isWholeFFValidation = context.getProperty(VALIDATION_STRATEGY).getValue().equals(VALIDATE_WHOLE_FLOWFILE.getValue());
    +
    +        final AtomicReference<Boolean> valid = new AtomicReference<Boolean>(true);
    +        final AtomicReference<Boolean> isFirstLine = new AtomicReference<Boolean>(true);
    +        final AtomicReference<Integer> okCount = new AtomicReference<Integer>(0);
    +        final AtomicReference<Integer> totalCount = new AtomicReference<Integer>(0);
    +        final AtomicReference<FlowFile> invalidFF = new AtomicReference<FlowFile>(null);
    +        final AtomicReference<FlowFile> validFF = new AtomicReference<FlowFile>(null);
    +
    +        if(!isWholeFFValidation) {
    +            invalidFF.set(session.create(flowFile));
    +            validFF.set(session.create(flowFile));
    +        }
    +
    +        session.read(flowFile, new InputStreamCallback() {
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                NifiCsvListReader listReader = null;
    +                try {
    +                    listReader = new NifiCsvListReader(new InputStreamReader(in), csvPref);
    +
    +                    // handling of header
    +                    if(header) {
    +                        List<String> headerList = listReader.read();
    +                        if(!isWholeFFValidation) {
    +                            invalidFF.set(session.append(invalidFF.get(), new OutputStreamCallback() {
    +                                @Override
    +                                public void process(OutputStream out) throws IOException {
    +                                    out.write(print(headerList, csvPref, isFirstLine.get()));
    +                                }
    +                            }));
    +                            validFF.set(session.append(validFF.get(), new OutputStreamCallback() {
    +                                @Override
    +                                public void process(OutputStream out) throws IOException {
    +                                    out.write(print(headerList, csvPref, isFirstLine.get()));
    +                                }
    +                            }));
    +                            isFirstLine.set(false);
    +                        }
    +                    }
    +
    +                    boolean stop = false;
    +
    +                    while (!stop) {
    +                        try {
    +
    +                            final List<Object> list = listReader.read(cellProcs);
    +                            stop = list == null;
    +
    +                            if(!isWholeFFValidation && !stop) {
    +                                validFF.set(session.append(validFF.get(), new OutputStreamCallback() {
    +                                    @Override
    +                                    public void process(OutputStream out) throws IOException {
    +                                        out.write(print(list, csvPref, isFirstLine.get()));
    +                                    }
    +                                }));
    +                                okCount.set(okCount.get() + 1);
    +
    +                                if(isFirstLine.get()) {
    +                                    isFirstLine.set(false);
    +                                }
    +                            }
    +
    +                        } catch (final SuperCsvCellProcessorException e) {
    +                            valid.set(false);
    +                            if(isWholeFFValidation) {
    +                                logger.debug("Failed to validate {} against schema due to {}; routing to 'invalid'", new Object[]{flowFile}, e);
    +                                break;
    +                            } else {
    +                                // we append the invalid line to the flow file that will be routed to invalid relationship
    +                                invalidFF.set(session.append(invalidFF.get(), new OutputStreamCallback() {
    +                                    @Override
    +                                    public void process(OutputStream out) throws IOException {
    +                                        out.write(print(e.getCsvContext().getRowSource(), csvPref, isFirstLine.get()));
    +                                    }
    +                                }));
    +
    +                                if(isFirstLine.get()) {
    +                                    isFirstLine.set(false);
    +                                }
    +                            }
    +                        } finally {
    +                            if(!isWholeFFValidation) {
    +                                totalCount.set(totalCount.get() + 1);
    +                            }
    +                        }
    +                    }
    +
    +                } catch (final IOException e) {
    +                    valid.set(false);
    +                    logger.error("Failed to validate {} against schema due to {}", new Object[]{flowFile}, e);
    +                } finally {
    +                    if(listReader != null) {
    +                        listReader.close();
    +                    }
    +                }
    +            }
    +        });
    +
    +        if(isWholeFFValidation) {
    +            if (valid.get()) {
    +                logger.info("Successfully validated {} against schema; routing to 'valid'", new Object[]{flowFile});
    --- End diff --
    
    This line is probably too common to be "info". Debug would match the other paths in this block too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #476: NIFI-1942 Processor to validate CSV against user-sup...

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/476#discussion_r71247145
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java ---
    @@ -408,9 +407,12 @@ public void process(final InputStream in) throws IOException {
                             listReader.read();
                         }
                         while(listReader.read(cellProcs) != null) {}
    -                } catch (final IOException | SuperCsvCellProcessorException e) {
    +                } catch (final IOException e) {
                         valid.set(false);
                         logger.error("Failed to validate {} against schema due to {}", new Object[]{flowFile}, e);
    +                } catch (final SuperCsvCellProcessorException e) {
    +                    valid.set(false);
    +                    logger.info("Failed to validate {} against schema due to {}; routing to 'invalid'", new Object[]{flowFile}, e);
    --- End diff --
    
    Actually some form of this should be made into an attribute to be added to invalid FlowFiles. That way users can know what is actually invalid. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #476: NIFI-1942 Processor to validate CSV against user-supplied s...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/476
  
    Thanks @JPercivall !
    I did the modification in the ``OnSchedule`` method and it is now working as expected. I also took the liberty to squash my commits. Let me know if there is something else.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #476: NIFI-1942 Processor to validate CSV against user-sup...

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/476#discussion_r72543424
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java ---
    @@ -0,0 +1,613 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.io.OutputStream;
    +import java.io.Reader;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.supercsv.cellprocessor.Optional;
    +import org.supercsv.cellprocessor.ParseBigDecimal;
    +import org.supercsv.cellprocessor.ParseBool;
    +import org.supercsv.cellprocessor.ParseChar;
    +import org.supercsv.cellprocessor.ParseDate;
    +import org.supercsv.cellprocessor.ParseDouble;
    +import org.supercsv.cellprocessor.ParseInt;
    +import org.supercsv.cellprocessor.ParseLong;
    +import org.supercsv.cellprocessor.constraint.DMinMax;
    +import org.supercsv.cellprocessor.constraint.Equals;
    +import org.supercsv.cellprocessor.constraint.ForbidSubStr;
    +import org.supercsv.cellprocessor.constraint.IsIncludedIn;
    +import org.supercsv.cellprocessor.constraint.LMinMax;
    +import org.supercsv.cellprocessor.constraint.NotNull;
    +import org.supercsv.cellprocessor.constraint.RequireHashCode;
    +import org.supercsv.cellprocessor.constraint.RequireSubStr;
    +import org.supercsv.cellprocessor.constraint.StrMinMax;
    +import org.supercsv.cellprocessor.constraint.StrNotNullOrEmpty;
    +import org.supercsv.cellprocessor.constraint.StrRegEx;
    +import org.supercsv.cellprocessor.constraint.Strlen;
    +import org.supercsv.cellprocessor.constraint.Unique;
    +import org.supercsv.cellprocessor.constraint.UniqueHashCode;
    +import org.supercsv.cellprocessor.ift.CellProcessor;
    +import org.supercsv.exception.SuperCsvCellProcessorException;
    +import org.supercsv.io.CsvListReader;
    +import org.supercsv.prefs.CsvPreference;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"csv", "schema", "validation"})
    +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified CSV schema. " +
    +        "Take a look at the additional documentation of this processor for some schema examples.")
    +public class ValidateCsv extends AbstractProcessor {
    +
    +    private final static List<String> allowedOperators = Arrays.asList("ParseBigDecimal", "ParseBool", "ParseChar", "ParseDate",
    +            "ParseDouble", "ParseInt", "ParseLong", "Optional", "DMinMax", "Equals", "ForbidSubStr", "LMinMax", "NotNull", "Null",
    +            "RequireHashCode", "RequireSubStr", "Strlen", "StrMinMax", "StrNotNullOrEmpty", "StrRegEx", "Unique",
    +            "UniqueHashCode", "IsIncludedIn");
    +
    +    private static final String routeWholeFlowFile = "Route to 'valid' the whole FlowFile if the CSV is valid";
    +    private static final String routeLinesIndividually = "Route to 'valid' a FlowFile containing the valid lines and"
    +            + " to 'invalid' a FlowFile containing the invalid lines";
    +
    +    public static final AllowableValue VALIDATE_WHOLE_FLOWFILE = new AllowableValue(routeWholeFlowFile, routeWholeFlowFile,
    +            "In case an error is found in the CSV file, the whole flow file will be routed to the 'invalid' relationship. "
    +                    + "This option offers best performances.");
    +
    +    public static final AllowableValue VALIDATE_LINES_INDIVIDUALLY = new AllowableValue(routeLinesIndividually, routeLinesIndividually,
    +            "In case an error is found, the input CSV file will be split into two FlowFiles: one routed to the 'valid' "
    +                    + "relationship containing all the correct lines and one routed to the 'invalid' relationship containing all "
    +                    + "the incorrect lines. Take care if choosing this option while using Unique cell processors in schema definition.");
    +
    +    public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
    +            .name("validate-csv-schema")
    +            .displayName("Schema")
    +            .description("The schema to be used for validation. Is expected a comma-delimited string representing the cell "
    +                    + "processors to apply. The following cell processors are allowed in the schema definition: "
    +                    + allowedOperators.toString() + ". Note: cell processors cannot be nested except with Optional.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor HEADER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-header")
    +            .displayName("Header")
    +            .description("True if the incoming flow file contains a header to ignore, false otherwise.")
    +            .required(true)
    +            .defaultValue("true")
    +            .allowableValues("true", "false")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor QUOTE_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-quote")
    +            .displayName("Quote character")
    +            .description("Character used as 'quote' in the incoming data. Example: \"")
    +            .required(true)
    +            .defaultValue("\"")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor DELIMITER_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-delimiter")
    +            .displayName("Delimiter character")
    +            .description("Character used as 'delimiter' in the incoming data. Example: ,")
    +            .required(true)
    +            .defaultValue(",")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor END_OF_LINE_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-eol")
    +            .displayName("End of line symbols")
    +            .description("Symbols used as 'end of line' in the incoming data. Example: \\n")
    +            .required(true)
    +            .defaultValue("\\n")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor VALIDATION_STRATEGY = new PropertyDescriptor.Builder()
    +            .name("validate-csv-strategy")
    +            .displayName("Validation strategy")
    +            .description("Strategy to apply when routing input files to output relationships.")
    +            .required(true)
    +            .defaultValue(VALIDATE_WHOLE_FLOWFILE.getValue())
    +            .allowableValues(VALIDATE_LINES_INDIVIDUALLY, VALIDATE_WHOLE_FLOWFILE)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_VALID = new Relationship.Builder()
    +            .name("valid")
    +            .description("FlowFiles that are successfully validated against the schema are routed to this relationship")
    +            .build();
    +    public static final Relationship REL_INVALID = new Relationship.Builder()
    +            .name("invalid")
    +            .description("FlowFiles that are not valid according to the specified schema are routed to this relationship")
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +    private final AtomicReference<CellProcessor[]> processors = new AtomicReference<CellProcessor[]>();
    +    private final AtomicReference<CsvPreference> preference = new AtomicReference<CsvPreference>();
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(SCHEMA);
    +        properties.add(HEADER);
    +        properties.add(DELIMITER_CHARACTER);
    +        properties.add(QUOTE_CHARACTER);
    +        properties.add(END_OF_LINE_CHARACTER);
    +        properties.add(VALIDATION_STRATEGY);
    +        this.properties = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_VALID);
    +        relationships.add(REL_INVALID);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    +        String schema = validationContext.getProperty(SCHEMA).getValue();
    +        try {
    +            this.parseSchema(validationContext.getProperty(SCHEMA).getValue());
    +        } catch (Exception e) {
    +            final List<ValidationResult> problems = new ArrayList<>(1);
    +            problems.add(new ValidationResult.Builder().subject(SCHEMA.getName())
    +                    .input(schema)
    +                    .valid(false)
    +                    .explanation("Error while parsing the schema: " + e.getMessage())
    +                    .build());
    +            return problems;
    +        }
    +        return super.customValidate(validationContext);
    +    }
    +
    +    @OnScheduled
    +    public void setPreference(final ProcessContext context) {
    +        this.preference.set(new CsvPreference.Builder(context.getProperty(QUOTE_CHARACTER).getValue().charAt(0),
    +                context.getProperty(DELIMITER_CHARACTER).getValue().charAt(0),
    +                context.getProperty(END_OF_LINE_CHARACTER).getValue()).build());
    +    }
    +
    +    /**
    +     * Method used to parse the string supplied by the user. The string is converted
    +     * to a list of cell processors used to validate the CSV data.
    +     * @param schema Schema to parse
    +     */
    +    private void parseSchema(String schema) {
    +        List<CellProcessor> processorsList = new ArrayList<CellProcessor>();
    +
    +        String remaining = schema;
    +        while(remaining.length() > 0) {
    +            remaining = setProcessor(remaining, processorsList);
    +        }
    +
    +        this.processors.set(processorsList.toArray(new CellProcessor[processorsList.size()]));
    +    }
    +
    +    private String setProcessor(String remaining, List<CellProcessor> processorsList) {
    +        StringBuffer buffer = new StringBuffer();
    +        int i = 0;
    +        int opening = 0;
    +        int closing = 0;
    +        while(buffer.length() != remaining.length()) {
    +            char c = remaining.charAt(i);
    +            i++;
    +
    +            if(opening == 0 && c == ',') {
    +                if(i == 1) {
    +                    continue;
    +                }
    +                break;
    +            }
    +
    +            buffer.append(c);
    +
    +            if(c == '(') {
    +                opening++;
    +            } else if(c == ')') {
    +                closing++;
    +            }
    +
    +            if(opening > 0 && opening == closing) {
    +                break;
    +            }
    +        }
    +
    +        final String procString = buffer.toString().trim();
    +        opening = procString.indexOf('(');
    +        String method = procString;
    +        String argument = null;
    +        if(opening != -1) {
    +            argument = method.substring(opening + 1, method.length() - 1);
    +            method = method.substring(0, opening);
    +        }
    +
    +        processorsList.add(getProcessor(method.toLowerCase(), argument));
    +
    +        return remaining.substring(i);
    +    }
    +
    +    private CellProcessor getProcessor(String method, String argument) {
    +        switch (method) {
    +
    +            case "optional":
    +                int opening = argument.indexOf('(');
    +                String subMethod = argument;
    +                String subArgument = null;
    +                if(opening != -1) {
    +                    subArgument = subMethod.substring(opening + 1, subMethod.length() - 1);
    +                    subMethod = subMethod.substring(0, opening);
    +                }
    +                return new Optional(getProcessor(subMethod.toLowerCase(), subArgument));
    +
    +            case "parsedate":
    +                return new ParseDate(argument.substring(1, argument.length() - 1));
    +
    +            case "parsedouble":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseDouble does not expect any argument but has " + argument);
    +                return new ParseDouble();
    +
    +            case "parsebigdecimal":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseBigDecimal does not expect any argument but has " + argument);
    +                return new ParseBigDecimal();
    +
    +            case "parsebool":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseBool does not expect any argument but has " + argument);
    +                return new ParseBool();
    +
    +            case "parsechar":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseChar does not expect any argument but has " + argument);
    +                return new ParseChar();
    +
    +            case "parseint":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseInt does not expect any argument but has " + argument);
    +                return new ParseInt();
    +
    +            case "parselong":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseLong does not expect any argument but has " + argument);
    +                return new ParseLong();
    +
    +            case "notnull":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("NotNull does not expect any argument but has " + argument);
    +                return new NotNull();
    +
    +            case "strregex":
    +                return new StrRegEx(argument.substring(1, argument.length() - 1));
    +
    +            case "unique":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("Unique does not expect any argument but has " + argument);
    +                return new Unique();
    +
    +            case "uniquehashcode":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("UniqueHashCode does not expect any argument but has " + argument);
    +                return new UniqueHashCode();
    +
    +            case "strlen":
    +                String[] splts = argument.split(",");
    +                int[] requiredLengths = new int[splts.length];
    +                for(int i = 0; i < splts.length; i++) {
    +                    requiredLengths[i] = Integer.parseInt(splts[i]);
    +                }
    +                return new Strlen(requiredLengths);
    +
    +            case "strminmax":
    +                String[] splits = argument.split(",");
    +                return new StrMinMax(Long.parseLong(splits[0]), Long.parseLong(splits[1]));
    +
    +            case "lminmax":
    +                String[] args = argument.split(",");
    +                return new LMinMax(Long.parseLong(args[0]), Long.parseLong(args[1]));
    +
    +            case "dminmax":
    +                String[] doubles = argument.split(",");
    +                return new DMinMax(Double.parseDouble(doubles[0]), Double.parseDouble(doubles[1]));
    +
    +            case "equals":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("Equals does not expect any argument but has " + argument);
    +                return new Equals();
    +
    +            case "forbidsubstr":
    +                String[] forbiddenSubStrings = argument.replaceAll("\"", "").split(",[ ]*");
    +                return new ForbidSubStr(forbiddenSubStrings);
    +
    +            case "requiresubstr":
    +                String[] requiredSubStrings = argument.replaceAll("\"", "").split(",[ ]*");
    +                return new RequireSubStr(requiredSubStrings);
    +
    +            case "strnotnullorempty":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("StrNotNullOrEmpty does not expect any argument but has " + argument);
    +                return new StrNotNullOrEmpty();
    +
    +            case "requirehashcode":
    +                String[] hashs = argument.split(",");
    +                int[] hashcodes = new int[hashs.length];
    +                for(int i = 0; i < hashs.length; i++) {
    +                    hashcodes[i] = Integer.parseInt(hashs[i]);
    +                }
    +                return new RequireHashCode(hashcodes);
    +
    +            case "null":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("Null does not expect any argument but has " + argument);
    +                return null;
    +
    +            case "isincludedin":
    +                String[] elements = argument.replaceAll("\"", "").split(",[ ]*");
    +                return new IsIncludedIn(elements);
    +
    +            default:
    +                throw new IllegalArgumentException("[" + method + "] is not an allowed method to define a Cell Processor");
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final CsvPreference csvPref = this.preference.get();
    +        final boolean header = context.getProperty(HEADER).asBoolean();
    +        final ComponentLog logger = getLogger();
    +        final CellProcessor[] cellProcs = this.processors.get();
    +        final boolean isWholeFFValidation = context.getProperty(VALIDATION_STRATEGY).getValue().equals(VALIDATE_WHOLE_FLOWFILE.getValue());
    +
    +        final AtomicReference<Boolean> valid = new AtomicReference<Boolean>(true);
    +        final AtomicReference<Boolean> isFirstLine = new AtomicReference<Boolean>(true);
    +        final AtomicReference<Integer> okCount = new AtomicReference<Integer>(0);
    +        final AtomicReference<Integer> totalCount = new AtomicReference<Integer>(0);
    +        final AtomicReference<FlowFile> invalidFF = new AtomicReference<FlowFile>(null);
    +        final AtomicReference<FlowFile> validFF = new AtomicReference<FlowFile>(null);
    +
    +        if(!isWholeFFValidation) {
    +            invalidFF.set(session.create(flowFile));
    +            validFF.set(session.create(flowFile));
    +        }
    +
    +        session.read(flowFile, new InputStreamCallback() {
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                NifiCsvListReader listReader = null;
    +                try {
    +                    listReader = new NifiCsvListReader(new InputStreamReader(in), csvPref);
    +
    +                    // handling of header
    +                    if(header) {
    +                        List<String> headerList = listReader.read();
    +                        if(!isWholeFFValidation) {
    +                            invalidFF.set(session.append(invalidFF.get(), new OutputStreamCallback() {
    +                                @Override
    +                                public void process(OutputStream out) throws IOException {
    +                                    out.write(print(headerList, csvPref, isFirstLine.get()));
    +                                }
    +                            }));
    +                            validFF.set(session.append(validFF.get(), new OutputStreamCallback() {
    +                                @Override
    +                                public void process(OutputStream out) throws IOException {
    +                                    out.write(print(headerList, csvPref, isFirstLine.get()));
    +                                }
    +                            }));
    +                            isFirstLine.set(false);
    +                        }
    +                    }
    +
    +                    boolean stop = false;
    +
    +                    while (!stop) {
    +                        try {
    +
    +                            final List<Object> list = listReader.read(cellProcs);
    +                            stop = list == null;
    +
    +                            if(!isWholeFFValidation && !stop) {
    +                                validFF.set(session.append(validFF.get(), new OutputStreamCallback() {
    +                                    @Override
    +                                    public void process(OutputStream out) throws IOException {
    +                                        out.write(print(list, csvPref, isFirstLine.get()));
    +                                    }
    +                                }));
    +                                okCount.set(okCount.get() + 1);
    +
    +                                if(isFirstLine.get()) {
    +                                    isFirstLine.set(false);
    +                                }
    +                            }
    +
    +                        } catch (final SuperCsvCellProcessorException e) {
    --- End diff --
    
    This should probably also catch "SuperCsvException". I ran into one when the number of columns I had as my processor list didn't match the number passed.
    
    
    ![screen shot 2016-07-27 at 7 35 46 pm](https://cloud.githubusercontent.com/assets/11302527/17196264/5594b3c2-5431-11e6-8014-b8f0743aa107.png)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #476: NIFI-1942 Processor to validate CSV against user-supplied s...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/476
  
    Hey @JPercivall 
    - Reverted NOTICE changes
    - Separated the exception and changed the logging approach when there is an invalid CSV
    - Changed the error message when the user provides a wrong method in the schema property and added a test (let me know in case it is not what you were thinking about)
    - Added IsIncludedIn cell processor (my bad!).
    - Added list of supported cell processors in additional documentation.
    
    For the other cell processors I am not sure to see how they would fit in a validating process. If I am missing something here, let me know. There are interesting cell processors to modify CSV content by writing a new CSV file (a new FlowFile). This could be nice but this is not only validation anymore, I assume this would be better in a different processor, no?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #476: NIFI-1942 Processor to validate CSV against user-sup...

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/476#discussion_r72542364
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java ---
    @@ -0,0 +1,613 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.io.OutputStream;
    +import java.io.Reader;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.supercsv.cellprocessor.Optional;
    +import org.supercsv.cellprocessor.ParseBigDecimal;
    +import org.supercsv.cellprocessor.ParseBool;
    +import org.supercsv.cellprocessor.ParseChar;
    +import org.supercsv.cellprocessor.ParseDate;
    +import org.supercsv.cellprocessor.ParseDouble;
    +import org.supercsv.cellprocessor.ParseInt;
    +import org.supercsv.cellprocessor.ParseLong;
    +import org.supercsv.cellprocessor.constraint.DMinMax;
    +import org.supercsv.cellprocessor.constraint.Equals;
    +import org.supercsv.cellprocessor.constraint.ForbidSubStr;
    +import org.supercsv.cellprocessor.constraint.IsIncludedIn;
    +import org.supercsv.cellprocessor.constraint.LMinMax;
    +import org.supercsv.cellprocessor.constraint.NotNull;
    +import org.supercsv.cellprocessor.constraint.RequireHashCode;
    +import org.supercsv.cellprocessor.constraint.RequireSubStr;
    +import org.supercsv.cellprocessor.constraint.StrMinMax;
    +import org.supercsv.cellprocessor.constraint.StrNotNullOrEmpty;
    +import org.supercsv.cellprocessor.constraint.StrRegEx;
    +import org.supercsv.cellprocessor.constraint.Strlen;
    +import org.supercsv.cellprocessor.constraint.Unique;
    +import org.supercsv.cellprocessor.constraint.UniqueHashCode;
    +import org.supercsv.cellprocessor.ift.CellProcessor;
    +import org.supercsv.exception.SuperCsvCellProcessorException;
    +import org.supercsv.io.CsvListReader;
    +import org.supercsv.prefs.CsvPreference;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"csv", "schema", "validation"})
    +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified CSV schema. " +
    +        "Take a look at the additional documentation of this processor for some schema examples.")
    +public class ValidateCsv extends AbstractProcessor {
    +
    +    private final static List<String> allowedOperators = Arrays.asList("ParseBigDecimal", "ParseBool", "ParseChar", "ParseDate",
    +            "ParseDouble", "ParseInt", "ParseLong", "Optional", "DMinMax", "Equals", "ForbidSubStr", "LMinMax", "NotNull", "Null",
    +            "RequireHashCode", "RequireSubStr", "Strlen", "StrMinMax", "StrNotNullOrEmpty", "StrRegEx", "Unique",
    +            "UniqueHashCode", "IsIncludedIn");
    +
    +    private static final String routeWholeFlowFile = "Route to 'valid' the whole FlowFile if the CSV is valid";
    +    private static final String routeLinesIndividually = "Route to 'valid' a FlowFile containing the valid lines and"
    +            + " to 'invalid' a FlowFile containing the invalid lines";
    +
    +    public static final AllowableValue VALIDATE_WHOLE_FLOWFILE = new AllowableValue(routeWholeFlowFile, routeWholeFlowFile,
    +            "In case an error is found in the CSV file, the whole flow file will be routed to the 'invalid' relationship. "
    +                    + "This option offers best performances.");
    +
    +    public static final AllowableValue VALIDATE_LINES_INDIVIDUALLY = new AllowableValue(routeLinesIndividually, routeLinesIndividually,
    +            "In case an error is found, the input CSV file will be split into two FlowFiles: one routed to the 'valid' "
    +                    + "relationship containing all the correct lines and one routed to the 'invalid' relationship containing all "
    +                    + "the incorrect lines. Take care if choosing this option while using Unique cell processors in schema definition.");
    +
    +    public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
    +            .name("validate-csv-schema")
    +            .displayName("Schema")
    +            .description("The schema to be used for validation. Is expected a comma-delimited string representing the cell "
    +                    + "processors to apply. The following cell processors are allowed in the schema definition: "
    +                    + allowedOperators.toString() + ". Note: cell processors cannot be nested except with Optional.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor HEADER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-header")
    +            .displayName("Header")
    +            .description("True if the incoming flow file contains a header to ignore, false otherwise.")
    +            .required(true)
    +            .defaultValue("true")
    +            .allowableValues("true", "false")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor QUOTE_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-quote")
    +            .displayName("Quote character")
    +            .description("Character used as 'quote' in the incoming data. Example: \"")
    +            .required(true)
    +            .defaultValue("\"")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor DELIMITER_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-delimiter")
    +            .displayName("Delimiter character")
    +            .description("Character used as 'delimiter' in the incoming data. Example: ,")
    +            .required(true)
    +            .defaultValue(",")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor END_OF_LINE_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-eol")
    +            .displayName("End of line symbols")
    +            .description("Symbols used as 'end of line' in the incoming data. Example: \\n")
    +            .required(true)
    +            .defaultValue("\\n")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor VALIDATION_STRATEGY = new PropertyDescriptor.Builder()
    +            .name("validate-csv-strategy")
    +            .displayName("Validation strategy")
    +            .description("Strategy to apply when routing input files to output relationships.")
    +            .required(true)
    +            .defaultValue(VALIDATE_WHOLE_FLOWFILE.getValue())
    +            .allowableValues(VALIDATE_LINES_INDIVIDUALLY, VALIDATE_WHOLE_FLOWFILE)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_VALID = new Relationship.Builder()
    +            .name("valid")
    +            .description("FlowFiles that are successfully validated against the schema are routed to this relationship")
    +            .build();
    +    public static final Relationship REL_INVALID = new Relationship.Builder()
    +            .name("invalid")
    +            .description("FlowFiles that are not valid according to the specified schema are routed to this relationship")
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +    private final AtomicReference<CellProcessor[]> processors = new AtomicReference<CellProcessor[]>();
    +    private final AtomicReference<CsvPreference> preference = new AtomicReference<CsvPreference>();
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(SCHEMA);
    +        properties.add(HEADER);
    +        properties.add(DELIMITER_CHARACTER);
    +        properties.add(QUOTE_CHARACTER);
    +        properties.add(END_OF_LINE_CHARACTER);
    +        properties.add(VALIDATION_STRATEGY);
    +        this.properties = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_VALID);
    +        relationships.add(REL_INVALID);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    +        String schema = validationContext.getProperty(SCHEMA).getValue();
    +        try {
    +            this.parseSchema(validationContext.getProperty(SCHEMA).getValue());
    +        } catch (Exception e) {
    +            final List<ValidationResult> problems = new ArrayList<>(1);
    +            problems.add(new ValidationResult.Builder().subject(SCHEMA.getName())
    +                    .input(schema)
    +                    .valid(false)
    +                    .explanation("Error while parsing the schema: " + e.getMessage())
    +                    .build());
    +            return problems;
    +        }
    +        return super.customValidate(validationContext);
    +    }
    +
    +    @OnScheduled
    +    public void setPreference(final ProcessContext context) {
    +        this.preference.set(new CsvPreference.Builder(context.getProperty(QUOTE_CHARACTER).getValue().charAt(0),
    +                context.getProperty(DELIMITER_CHARACTER).getValue().charAt(0),
    +                context.getProperty(END_OF_LINE_CHARACTER).getValue()).build());
    +    }
    +
    +    /**
    +     * Method used to parse the string supplied by the user. The string is converted
    +     * to a list of cell processors used to validate the CSV data.
    +     * @param schema Schema to parse
    +     */
    +    private void parseSchema(String schema) {
    +        List<CellProcessor> processorsList = new ArrayList<CellProcessor>();
    +
    +        String remaining = schema;
    +        while(remaining.length() > 0) {
    +            remaining = setProcessor(remaining, processorsList);
    +        }
    +
    +        this.processors.set(processorsList.toArray(new CellProcessor[processorsList.size()]));
    +    }
    +
    +    private String setProcessor(String remaining, List<CellProcessor> processorsList) {
    +        StringBuffer buffer = new StringBuffer();
    +        int i = 0;
    +        int opening = 0;
    +        int closing = 0;
    +        while(buffer.length() != remaining.length()) {
    +            char c = remaining.charAt(i);
    +            i++;
    +
    +            if(opening == 0 && c == ',') {
    +                if(i == 1) {
    +                    continue;
    +                }
    +                break;
    +            }
    +
    +            buffer.append(c);
    +
    +            if(c == '(') {
    +                opening++;
    +            } else if(c == ')') {
    +                closing++;
    +            }
    +
    +            if(opening > 0 && opening == closing) {
    +                break;
    +            }
    +        }
    +
    +        final String procString = buffer.toString().trim();
    +        opening = procString.indexOf('(');
    +        String method = procString;
    +        String argument = null;
    +        if(opening != -1) {
    +            argument = method.substring(opening + 1, method.length() - 1);
    +            method = method.substring(0, opening);
    +        }
    +
    +        processorsList.add(getProcessor(method.toLowerCase(), argument));
    +
    +        return remaining.substring(i);
    +    }
    +
    +    private CellProcessor getProcessor(String method, String argument) {
    +        switch (method) {
    +
    +            case "optional":
    +                int opening = argument.indexOf('(');
    +                String subMethod = argument;
    +                String subArgument = null;
    +                if(opening != -1) {
    +                    subArgument = subMethod.substring(opening + 1, subMethod.length() - 1);
    +                    subMethod = subMethod.substring(0, opening);
    +                }
    +                return new Optional(getProcessor(subMethod.toLowerCase(), subArgument));
    +
    +            case "parsedate":
    +                return new ParseDate(argument.substring(1, argument.length() - 1));
    +
    +            case "parsedouble":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseDouble does not expect any argument but has " + argument);
    +                return new ParseDouble();
    +
    +            case "parsebigdecimal":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseBigDecimal does not expect any argument but has " + argument);
    +                return new ParseBigDecimal();
    +
    +            case "parsebool":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseBool does not expect any argument but has " + argument);
    +                return new ParseBool();
    +
    +            case "parsechar":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseChar does not expect any argument but has " + argument);
    +                return new ParseChar();
    +
    +            case "parseint":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseInt does not expect any argument but has " + argument);
    +                return new ParseInt();
    +
    +            case "parselong":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseLong does not expect any argument but has " + argument);
    +                return new ParseLong();
    +
    +            case "notnull":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("NotNull does not expect any argument but has " + argument);
    +                return new NotNull();
    +
    +            case "strregex":
    +                return new StrRegEx(argument.substring(1, argument.length() - 1));
    +
    +            case "unique":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("Unique does not expect any argument but has " + argument);
    +                return new Unique();
    +
    +            case "uniquehashcode":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("UniqueHashCode does not expect any argument but has " + argument);
    +                return new UniqueHashCode();
    +
    +            case "strlen":
    +                String[] splts = argument.split(",");
    +                int[] requiredLengths = new int[splts.length];
    +                for(int i = 0; i < splts.length; i++) {
    +                    requiredLengths[i] = Integer.parseInt(splts[i]);
    +                }
    +                return new Strlen(requiredLengths);
    +
    +            case "strminmax":
    +                String[] splits = argument.split(",");
    +                return new StrMinMax(Long.parseLong(splits[0]), Long.parseLong(splits[1]));
    +
    +            case "lminmax":
    +                String[] args = argument.split(",");
    +                return new LMinMax(Long.parseLong(args[0]), Long.parseLong(args[1]));
    +
    +            case "dminmax":
    +                String[] doubles = argument.split(",");
    +                return new DMinMax(Double.parseDouble(doubles[0]), Double.parseDouble(doubles[1]));
    +
    +            case "equals":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("Equals does not expect any argument but has " + argument);
    +                return new Equals();
    +
    +            case "forbidsubstr":
    +                String[] forbiddenSubStrings = argument.replaceAll("\"", "").split(",[ ]*");
    +                return new ForbidSubStr(forbiddenSubStrings);
    +
    +            case "requiresubstr":
    +                String[] requiredSubStrings = argument.replaceAll("\"", "").split(",[ ]*");
    +                return new RequireSubStr(requiredSubStrings);
    +
    +            case "strnotnullorempty":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("StrNotNullOrEmpty does not expect any argument but has " + argument);
    +                return new StrNotNullOrEmpty();
    +
    +            case "requirehashcode":
    +                String[] hashs = argument.split(",");
    +                int[] hashcodes = new int[hashs.length];
    +                for(int i = 0; i < hashs.length; i++) {
    +                    hashcodes[i] = Integer.parseInt(hashs[i]);
    +                }
    +                return new RequireHashCode(hashcodes);
    +
    +            case "null":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("Null does not expect any argument but has " + argument);
    +                return null;
    +
    +            case "isincludedin":
    +                String[] elements = argument.replaceAll("\"", "").split(",[ ]*");
    +                return new IsIncludedIn(elements);
    +
    +            default:
    +                throw new IllegalArgumentException("[" + method + "] is not an allowed method to define a Cell Processor");
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final CsvPreference csvPref = this.preference.get();
    +        final boolean header = context.getProperty(HEADER).asBoolean();
    +        final ComponentLog logger = getLogger();
    +        final CellProcessor[] cellProcs = this.processors.get();
    +        final boolean isWholeFFValidation = context.getProperty(VALIDATION_STRATEGY).getValue().equals(VALIDATE_WHOLE_FLOWFILE.getValue());
    +
    +        final AtomicReference<Boolean> valid = new AtomicReference<Boolean>(true);
    +        final AtomicReference<Boolean> isFirstLine = new AtomicReference<Boolean>(true);
    +        final AtomicReference<Integer> okCount = new AtomicReference<Integer>(0);
    +        final AtomicReference<Integer> totalCount = new AtomicReference<Integer>(0);
    +        final AtomicReference<FlowFile> invalidFF = new AtomicReference<FlowFile>(null);
    +        final AtomicReference<FlowFile> validFF = new AtomicReference<FlowFile>(null);
    +
    +        if(!isWholeFFValidation) {
    +            invalidFF.set(session.create(flowFile));
    +            validFF.set(session.create(flowFile));
    +        }
    +
    +        session.read(flowFile, new InputStreamCallback() {
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                NifiCsvListReader listReader = null;
    +                try {
    +                    listReader = new NifiCsvListReader(new InputStreamReader(in), csvPref);
    +
    +                    // handling of header
    +                    if(header) {
    +                        List<String> headerList = listReader.read();
    +                        if(!isWholeFFValidation) {
    +                            invalidFF.set(session.append(invalidFF.get(), new OutputStreamCallback() {
    +                                @Override
    +                                public void process(OutputStream out) throws IOException {
    +                                    out.write(print(headerList, csvPref, isFirstLine.get()));
    +                                }
    +                            }));
    +                            validFF.set(session.append(validFF.get(), new OutputStreamCallback() {
    +                                @Override
    +                                public void process(OutputStream out) throws IOException {
    +                                    out.write(print(headerList, csvPref, isFirstLine.get()));
    +                                }
    +                            }));
    +                            isFirstLine.set(false);
    +                        }
    +                    }
    +
    +                    boolean stop = false;
    +
    +                    while (!stop) {
    +                        try {
    +
    +                            final List<Object> list = listReader.read(cellProcs);
    +                            stop = list == null;
    +
    +                            if(!isWholeFFValidation && !stop) {
    +                                validFF.set(session.append(validFF.get(), new OutputStreamCallback() {
    +                                    @Override
    +                                    public void process(OutputStream out) throws IOException {
    +                                        out.write(print(list, csvPref, isFirstLine.get()));
    +                                    }
    +                                }));
    +                                okCount.set(okCount.get() + 1);
    +
    +                                if(isFirstLine.get()) {
    +                                    isFirstLine.set(false);
    +                                }
    +                            }
    +
    +                        } catch (final SuperCsvCellProcessorException e) {
    +                            valid.set(false);
    +                            if(isWholeFFValidation) {
    +                                logger.debug("Failed to validate {} against schema due to {}; routing to 'invalid'", new Object[]{flowFile}, e);
    +                                break;
    +                            } else {
    +                                // we append the invalid line to the flow file that will be routed to invalid relationship
    +                                invalidFF.set(session.append(invalidFF.get(), new OutputStreamCallback() {
    +                                    @Override
    +                                    public void process(OutputStream out) throws IOException {
    +                                        out.write(print(e.getCsvContext().getRowSource(), csvPref, isFirstLine.get()));
    +                                    }
    +                                }));
    +
    +                                if(isFirstLine.get()) {
    +                                    isFirstLine.set(false);
    +                                }
    +                            }
    +                        } finally {
    +                            if(!isWholeFFValidation) {
    +                                totalCount.set(totalCount.get() + 1);
    +                            }
    +                        }
    +                    }
    +
    +                } catch (final IOException e) {
    +                    valid.set(false);
    +                    logger.error("Failed to validate {} against schema due to {}", new Object[]{flowFile}, e);
    +                } finally {
    +                    if(listReader != null) {
    +                        listReader.close();
    +                    }
    +                }
    +            }
    +        });
    +
    +        if(isWholeFFValidation) {
    +            if (valid.get()) {
    +                logger.info("Successfully validated {} against schema; routing to 'valid'", new Object[]{flowFile});
    +                session.getProvenanceReporter().route(flowFile, REL_VALID);
    +                session.transfer(flowFile, REL_VALID);
    +            } else {
    +                session.getProvenanceReporter().route(flowFile, REL_INVALID);
    +                session.transfer(flowFile, REL_INVALID);
    +            }
    +        } else {
    +            if (valid.get()) {
    +                logger.debug("Successfully validated {} against schema; routing to 'valid'", new Object[]{validFF.get()});
    +                session.getProvenanceReporter().route(validFF.get(), REL_VALID, "All " + totalCount.get() + " line(s) are valid");
    +                session.transfer(validFF.get(), REL_VALID);
    +                session.remove(invalidFF.get());
    +                session.remove(flowFile);
    +            } else if (okCount.get() != 0) {
    +                if(header) {
    +                    totalCount.set(totalCount.get() - 1);
    +                }
    +
    +                logger.debug("Successfully validated {}/{} line(s) in {} against schema; routing valid lines to 'valid' and invalid lines to 'invalid'",
    +                        new Object[]{okCount.get(), totalCount.get(), flowFile});
    +                session.getProvenanceReporter().route(validFF.get(), REL_VALID, okCount.get() + " valid line(s)");
    +                session.transfer(validFF.get(), REL_VALID);
    +                session.getProvenanceReporter().route(invalidFF.get(), REL_INVALID, (totalCount.get() - okCount.get()) + " invalid line(s)");
    +                session.transfer(invalidFF.get(), REL_INVALID);
    +                session.remove(flowFile);
    +            } else {
    +                logger.debug("All lines in {} are invalid; routing to 'invalid'", new Object[]{invalidFF.get()});
    +                session.getProvenanceReporter().route(invalidFF.get(), REL_INVALID, "All " + totalCount.get() + " line(s) are invalid");
    +                session.transfer(invalidFF.get(), REL_INVALID);
    +                session.remove(validFF.get());
    +                session.remove(flowFile);
    +            }
    +        }
    +    }
    +
    +    /**
    +     * Method used to correctly write the lines by taking into account end of line
    +     * character and separator character.
    +     * @param list list of elements of the current row
    +     * @param csvPref CSV preferences
    +     * @param isFirstLine true if this is the first line we append
    +     * @return String to append in the flow file
    +     */
    +    private byte[] print(List<?> list, CsvPreference csvPref, boolean isFirstLine) {
    --- End diff --
    
    Not sure why but after being routed through the processor instead of '\n' the new line character the literals "\" and "n" were added. Below are two screen shots of the data below and after (only line routed to invalid)
    
    ![screen shot 2016-07-27 at 7 23 52 pm](https://cloud.githubusercontent.com/assets/11302527/17196060/e4267118-542f-11e6-8831-83f8389a615a.png)
    ![screen shot 2016-07-27 at 7 23 55 pm](https://cloud.githubusercontent.com/assets/11302527/17196059/e424d600-542f-11e6-8e35-5f1274912ef1.png)
     


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #476: NIFI-1942 Processor to validate CSV against user-sup...

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/476#discussion_r71243836
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java ---
    @@ -408,9 +407,12 @@ public void process(final InputStream in) throws IOException {
                             listReader.read();
                         }
                         while(listReader.read(cellProcs) != null) {}
    -                } catch (final IOException | SuperCsvCellProcessorException e) {
    +                } catch (final IOException e) {
                         valid.set(false);
                         logger.error("Failed to validate {} against schema due to {}", new Object[]{flowFile}, e);
    +                } catch (final SuperCsvCellProcessorException e) {
    +                    valid.set(false);
    +                    logger.info("Failed to validate {} against schema due to {}; routing to 'invalid'", new Object[]{flowFile}, e);
    --- End diff --
    
    This should be debug, not info, this will occur as a part of normal operation and is only really relevant for debugging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #476: NIFI-1942 Processor to validate CSV against user-supplied s...

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on the issue:

    https://github.com/apache/nifi/pull/476
  
    @pvillard31, I know what the problem with the end-line characters is. When going from the UI to Java, the characters are escaped so that what you input is transferred over to Java as is. So when you type the characters "\" and "\n" into the UI the Java string will end up being those two characters *not* the interpreted value "\n".
    
    There's been some discussion about it before and how we need to make some change but it hasn't been a top priority. For now what is done, is something like is done here[1]. Where the default value is escaped and then in the OnScheduled[2] or as a separate method[3] it is interpreted. 
    
    [1] https://github.com/apache/nifi/blob/1373bf672586ba5ddcfa697c45c832ccc79425cb/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java#L61-L61
    [2] https://github.com/apache/nifi/blob/1373bf672586ba5ddcfa697c45c832ccc79425cb/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java#L97-L97
    [3] https://github.com/apache/nifi/blob/cd846c8d627efb2606f72b6af009358dec27be63/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java#L566-L566


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #476: NIFI-1942 Processor to validate CSV against user-supplied s...

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on the issue:

    https://github.com/apache/nifi/pull/476
  
    Hey @pvillard31, looks like this has merge conflicts. Could you address them?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #476: NIFI-1942 Processor to validate CSV against user-sup...

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/476#discussion_r70719085
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java ---
    @@ -0,0 +1,408 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.supercsv.cellprocessor.Optional;
    +import org.supercsv.cellprocessor.ParseBigDecimal;
    +import org.supercsv.cellprocessor.ParseBool;
    +import org.supercsv.cellprocessor.ParseChar;
    +import org.supercsv.cellprocessor.ParseDate;
    +import org.supercsv.cellprocessor.ParseDouble;
    +import org.supercsv.cellprocessor.ParseInt;
    +import org.supercsv.cellprocessor.ParseLong;
    +import org.supercsv.cellprocessor.constraint.DMinMax;
    +import org.supercsv.cellprocessor.constraint.Equals;
    +import org.supercsv.cellprocessor.constraint.ForbidSubStr;
    +import org.supercsv.cellprocessor.constraint.LMinMax;
    +import org.supercsv.cellprocessor.constraint.NotNull;
    +import org.supercsv.cellprocessor.constraint.RequireHashCode;
    +import org.supercsv.cellprocessor.constraint.RequireSubStr;
    +import org.supercsv.cellprocessor.constraint.StrMinMax;
    +import org.supercsv.cellprocessor.constraint.StrNotNullOrEmpty;
    +import org.supercsv.cellprocessor.constraint.StrRegEx;
    +import org.supercsv.cellprocessor.constraint.Strlen;
    +import org.supercsv.cellprocessor.constraint.Unique;
    +import org.supercsv.cellprocessor.constraint.UniqueHashCode;
    +import org.supercsv.cellprocessor.ift.CellProcessor;
    +import org.supercsv.exception.SuperCsvCellProcessorException;
    +import org.supercsv.io.CsvListReader;
    +import org.supercsv.prefs.CsvPreference;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"csv", "schema", "validation"})
    +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified CSV schema")
    +public class ValidateCsv extends AbstractProcessor {
    +
    +    private final static List<String> allowedOperators = Arrays.asList("ParseBigDecimal", "ParseBool", "ParseChar", "ParseDate",
    +        "ParseDouble", "ParseInt", "ParseLong", "Optional", "DMinMax", "Equals", "ForbidSubStr", "LMinMax", "NotNull", "Null",
    +        "RequireHashCode", "RequireSubStr", "Strlen", "StrMinMax", "StrNotNullOrEmpty", "StrRegEx", "Unique",
    +        "UniqueHashCode");
    +
    +    public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
    +            .name("validate-csv-schema")
    +            .displayName("Schema")
    +            .description("The schema to be used for validation. Is expected a comma-delimited string representing" +
    +                    "the cell processors to apply. The following cell processors are allowed in the schema definition: " +
    +                    allowedOperators.toString() + ". Note: cell processors cannot be nested except with Optional.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor HEADER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-header")
    +            .displayName("Header")
    +            .description("True if the incoming flow file contains a header to ignore, false otherwise.")
    +            .required(true)
    +            .defaultValue("true")
    +            .allowableValues("true", "false")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor QUOTE_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-quote")
    +            .displayName("Quote character")
    +            .description("Character used as 'quote' in the incoming data. Example: \"")
    +            .required(true)
    +            .defaultValue("\"")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor DELIMITER_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-delimiter")
    +            .displayName("Delimiter character")
    +            .description("Character used as 'delimiter' in the incoming data. Example: ;")
    +            .required(true)
    +            .defaultValue(";")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor END_OF_LINE_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-eol")
    +            .displayName("End of line symbols")
    +            .description("Symbols used as 'end of line' in the incoming data. Example: \\n")
    +            .required(true)
    +            .defaultValue("\\n")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_VALID = new Relationship.Builder()
    +            .name("valid")
    +            .description("FlowFiles that are successfully validated against the schema are routed to this relationship")
    +            .build();
    +    public static final Relationship REL_INVALID = new Relationship.Builder()
    +            .name("invalid")
    +            .description("FlowFiles that are not valid according to the specified schema are routed to this relationship")
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +    private final AtomicReference<List<CellProcessor>> processors = new AtomicReference<List<CellProcessor>>();
    +    private final AtomicReference<CsvPreference> preference = new AtomicReference<CsvPreference>();
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(SCHEMA);
    +        properties.add(HEADER);
    +        properties.add(DELIMITER_CHARACTER);
    +        properties.add(QUOTE_CHARACTER);
    +        properties.add(END_OF_LINE_CHARACTER);
    +        this.properties = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_VALID);
    +        relationships.add(REL_INVALID);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    +        String schema = validationContext.getProperty(SCHEMA).getValue();
    +        try {
    +            this.parseSchema(validationContext.getProperty(SCHEMA).getValue());
    +        } catch (Exception e) {
    +            final List<ValidationResult> problems = new ArrayList<>(1);
    +            problems.add(new ValidationResult.Builder().subject(SCHEMA.getName())
    +                    .input(schema)
    +                    .valid(false)
    +                    .explanation("Error while parsing the schema: " + e.getMessage())
    +                    .build());
    +            return problems;
    +        }
    +        return super.customValidate(validationContext);
    +    }
    +
    +    @OnScheduled
    +    public void setPreference(final ProcessContext context) {
    +        this.preference.set(new CsvPreference.Builder(context.getProperty(QUOTE_CHARACTER).getValue().charAt(0),
    +                context.getProperty(DELIMITER_CHARACTER).getValue().charAt(0),
    +                context.getProperty(END_OF_LINE_CHARACTER).getValue()).build());
    +    }
    +
    +    /**
    +     * Method used to parse the string supplied by the user. The string is converted
    +     * to a list of cell processors used to validate the CSV data.
    +     * @param schema Schema to parse
    +     */
    +    private void parseSchema(String schema) {
    +        this.processors.set(new ArrayList<CellProcessor>());
    +        String remaining = schema;
    +        while(remaining.length() > 0) {
    +            remaining = setProcessor(remaining);
    +        }
    +    }
    +
    +    private String setProcessor(String remaining) {
    +        StringBuffer buffer = new StringBuffer();
    +        int i = 0;
    +        int opening = 0;
    +        int closing = 0;
    +        while(buffer.length() != remaining.length()) {
    +            char c = remaining.charAt(i);
    +            i++;
    +
    +            if(opening == 0 && c == ',') {
    +                if(i == 1) {
    +                    continue;
    +                }
    +                break;
    +            }
    +
    +            buffer.append(c);
    +
    +            if(c == '(') {
    +                opening++;
    +            } else if(c == ')') {
    +                closing++;
    +            }
    +
    +            if(opening > 0 && opening == closing) {
    +                break;
    +            }
    +        }
    +
    +        final String procString = buffer.toString().trim();
    +        opening = procString.indexOf('(');
    +        String method = procString;
    +        String argument = null;
    +        if(opening != -1) {
    +            argument = method.substring(opening + 1, method.length() - 1);
    +            method = method.substring(0, opening);
    +        }
    +
    +        this.processors.get().add(getProcessor(method.toLowerCase(), argument));
    +
    +        return remaining.substring(i);
    +    }
    +
    +    private CellProcessor getProcessor(String method, String argument) {
    +        switch (method) {
    +
    +        case "optional":
    +            int opening = argument.indexOf('(');
    +            String subMethod = argument;
    +            String subArgument = null;
    +            if(opening != -1) {
    +                subArgument = subMethod.substring(opening + 1, subMethod.length() - 1);
    +                subMethod = subMethod.substring(0, opening);
    +            }
    +            return new Optional(getProcessor(subMethod.toLowerCase(), subArgument));
    +
    +        case "parsedate":
    +            return new ParseDate(argument.substring(1, argument.length() - 1));
    +
    +        case "parsedouble":
    +            return new ParseDouble();
    --- End diff --
    
    Since this method is being used to validate, all of these no-param methods should validate that there are no arguments being passed to them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #476: NIFI-1942 Processor to validate CSV against user-supplied s...

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on the issue:

    https://github.com/apache/nifi/pull/476
  
    @pvillard31 I left a couple comments. Do you by chance have a template and/or example csv data I can use to validate the processor?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #476: NIFI-1942 Processor to validate CSV against user-supplied s...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/476
  
    Hey @JPercivall, thanks for the review! I believe I addressed all of your comments. I also added the new dependency in NOTICE files (let me know if it is OK). To validate the processor I used a simple flow to generate CSV data with a ReplaceText processor: I'm gonna add the template to the JIRA right now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #476: NIFI-1942 Processor to validate CSV against user-sup...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/476


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #476: NIFI-1942 Processor to validate CSV against user-sup...

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/476#discussion_r71088596
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java ---
    @@ -0,0 +1,433 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.supercsv.cellprocessor.Optional;
    +import org.supercsv.cellprocessor.ParseBigDecimal;
    +import org.supercsv.cellprocessor.ParseBool;
    +import org.supercsv.cellprocessor.ParseChar;
    +import org.supercsv.cellprocessor.ParseDate;
    +import org.supercsv.cellprocessor.ParseDouble;
    +import org.supercsv.cellprocessor.ParseInt;
    +import org.supercsv.cellprocessor.ParseLong;
    +import org.supercsv.cellprocessor.constraint.DMinMax;
    +import org.supercsv.cellprocessor.constraint.Equals;
    +import org.supercsv.cellprocessor.constraint.ForbidSubStr;
    +import org.supercsv.cellprocessor.constraint.LMinMax;
    +import org.supercsv.cellprocessor.constraint.NotNull;
    +import org.supercsv.cellprocessor.constraint.RequireHashCode;
    +import org.supercsv.cellprocessor.constraint.RequireSubStr;
    +import org.supercsv.cellprocessor.constraint.StrMinMax;
    +import org.supercsv.cellprocessor.constraint.StrNotNullOrEmpty;
    +import org.supercsv.cellprocessor.constraint.StrRegEx;
    +import org.supercsv.cellprocessor.constraint.Strlen;
    +import org.supercsv.cellprocessor.constraint.Unique;
    +import org.supercsv.cellprocessor.constraint.UniqueHashCode;
    +import org.supercsv.cellprocessor.ift.CellProcessor;
    +import org.supercsv.exception.SuperCsvCellProcessorException;
    +import org.supercsv.io.CsvListReader;
    +import org.supercsv.prefs.CsvPreference;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"csv", "schema", "validation"})
    +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified CSV schema. " +
    +        "Take a look at the additional documentation of this processor for some schema examples.")
    +public class ValidateCsv extends AbstractProcessor {
    +
    +    private final static List<String> allowedOperators = Arrays.asList("ParseBigDecimal", "ParseBool", "ParseChar", "ParseDate",
    +            "ParseDouble", "ParseInt", "ParseLong", "Optional", "DMinMax", "Equals", "ForbidSubStr", "LMinMax", "NotNull", "Null",
    +            "RequireHashCode", "RequireSubStr", "Strlen", "StrMinMax", "StrNotNullOrEmpty", "StrRegEx", "Unique",
    +            "UniqueHashCode");
    +
    +    public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
    +            .name("validate-csv-schema")
    +            .displayName("Schema")
    +            .description("The schema to be used for validation. Is expected a comma-delimited string representing" +
    +                    "the cell processors to apply. The following cell processors are allowed in the schema definition: " +
    +                    allowedOperators.toString() + ". Note: cell processors cannot be nested except with Optional.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor HEADER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-header")
    +            .displayName("Header")
    +            .description("True if the incoming flow file contains a header to ignore, false otherwise.")
    +            .required(true)
    +            .defaultValue("true")
    +            .allowableValues("true", "false")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor QUOTE_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-quote")
    +            .displayName("Quote character")
    +            .description("Character used as 'quote' in the incoming data. Example: \"")
    +            .required(true)
    +            .defaultValue("\"")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor DELIMITER_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-delimiter")
    +            .displayName("Delimiter character")
    +            .description("Character used as 'delimiter' in the incoming data. Example: ,")
    +            .required(true)
    +            .defaultValue(",")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor END_OF_LINE_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-eol")
    +            .displayName("End of line symbols")
    +            .description("Symbols used as 'end of line' in the incoming data. Example: \\n")
    +            .required(true)
    +            .defaultValue("\\n")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_VALID = new Relationship.Builder()
    +            .name("valid")
    +            .description("FlowFiles that are successfully validated against the schema are routed to this relationship")
    +            .build();
    +    public static final Relationship REL_INVALID = new Relationship.Builder()
    +            .name("invalid")
    +            .description("FlowFiles that are not valid according to the specified schema are routed to this relationship")
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +    private final AtomicReference<CellProcessor[]> processors = new AtomicReference<CellProcessor[]>();
    +    private final AtomicReference<CsvPreference> preference = new AtomicReference<CsvPreference>();
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(SCHEMA);
    +        properties.add(HEADER);
    +        properties.add(DELIMITER_CHARACTER);
    +        properties.add(QUOTE_CHARACTER);
    +        properties.add(END_OF_LINE_CHARACTER);
    +        this.properties = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_VALID);
    +        relationships.add(REL_INVALID);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    +        String schema = validationContext.getProperty(SCHEMA).getValue();
    +        try {
    +            this.parseSchema(validationContext.getProperty(SCHEMA).getValue());
    +        } catch (Exception e) {
    +            final List<ValidationResult> problems = new ArrayList<>(1);
    +            problems.add(new ValidationResult.Builder().subject(SCHEMA.getName())
    +                    .input(schema)
    +                    .valid(false)
    +                    .explanation("Error while parsing the schema: " + e.getMessage())
    +                    .build());
    +            return problems;
    +        }
    +        return super.customValidate(validationContext);
    +    }
    +
    +    @OnScheduled
    +    public void setPreference(final ProcessContext context) {
    +        this.preference.set(new CsvPreference.Builder(context.getProperty(QUOTE_CHARACTER).getValue().charAt(0),
    +                context.getProperty(DELIMITER_CHARACTER).getValue().charAt(0),
    +                context.getProperty(END_OF_LINE_CHARACTER).getValue()).build());
    +    }
    +
    +    /**
    +     * Method used to parse the string supplied by the user. The string is converted
    +     * to a list of cell processors used to validate the CSV data.
    +     * @param schema Schema to parse
    +     */
    +    private void parseSchema(String schema) {
    +        List<CellProcessor> processorsList = new ArrayList<CellProcessor>();
    +
    +        String remaining = schema;
    +        while(remaining.length() > 0) {
    +            remaining = setProcessor(remaining, processorsList);
    +        }
    +
    +        this.processors.set(processorsList.toArray(new CellProcessor[processorsList.size()]));
    +    }
    +
    +    private String setProcessor(String remaining, List<CellProcessor> processorsList) {
    +        StringBuffer buffer = new StringBuffer();
    +        int i = 0;
    +        int opening = 0;
    +        int closing = 0;
    +        while(buffer.length() != remaining.length()) {
    +            char c = remaining.charAt(i);
    +            i++;
    +
    +            if(opening == 0 && c == ',') {
    +                if(i == 1) {
    +                    continue;
    +                }
    +                break;
    +            }
    +
    +            buffer.append(c);
    +
    +            if(c == '(') {
    +                opening++;
    +            } else if(c == ')') {
    +                closing++;
    +            }
    +
    +            if(opening > 0 && opening == closing) {
    +                break;
    +            }
    +        }
    +
    +        final String procString = buffer.toString().trim();
    +        opening = procString.indexOf('(');
    +        String method = procString;
    +        String argument = null;
    +        if(opening != -1) {
    +            argument = method.substring(opening + 1, method.length() - 1);
    +            method = method.substring(0, opening);
    +        }
    +
    +        processorsList.add(getProcessor(method.toLowerCase(), argument));
    +
    +        return remaining.substring(i);
    +    }
    +
    +    private CellProcessor getProcessor(String method, String argument) {
    +        switch (method) {
    +
    +            case "optional":
    +                int opening = argument.indexOf('(');
    +                String subMethod = argument;
    +                String subArgument = null;
    +                if(opening != -1) {
    +                    subArgument = subMethod.substring(opening + 1, subMethod.length() - 1);
    +                    subMethod = subMethod.substring(0, opening);
    +                }
    +                return new Optional(getProcessor(subMethod.toLowerCase(), subArgument));
    +
    +            case "parsedate":
    +                return new ParseDate(argument.substring(1, argument.length() - 1));
    +
    +            case "parsedouble":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseDouble does not expect any argument but has " + argument);
    +                return new ParseDouble();
    +
    +            case "parsebigdecimal":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseBigDecimal does not expect any argument but has " + argument);
    +                return new ParseBigDecimal();
    +
    +            case "parsebool":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseBool does not expect any argument but has " + argument);
    +                return new ParseBool();
    +
    +            case "parsechar":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseChar does not expect any argument but has " + argument);
    +                return new ParseChar();
    +
    +            case "parseint":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseInt does not expect any argument but has " + argument);
    +                return new ParseInt();
    +
    +            case "parselong":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseLong does not expect any argument but has " + argument);
    +                return new ParseLong();
    +
    +            case "notnull":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("NotNull does not expect any argument but has " + argument);
    +                return new NotNull();
    +
    +            case "strregex":
    +                return new StrRegEx(argument.substring(1, argument.length() - 1));
    +
    +            case "unique":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("Unique does not expect any argument but has " + argument);
    +                return new Unique();
    +
    +            case "uniquehashcode":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("UniqueHashCode does not expect any argument but has " + argument);
    +                return new UniqueHashCode();
    +
    +            case "strlen":
    +                String[] splts = argument.split(",");
    +                int[] requiredLengths = new int[splts.length];
    +                for(int i = 0; i < splts.length; i++) {
    +                    requiredLengths[i] = Integer.parseInt(splts[i]);
    +                }
    +                return new Strlen(requiredLengths);
    +
    +            case "strminmax":
    +                String[] splits = argument.split(",");
    +                return new StrMinMax(Long.parseLong(splits[0]), Long.parseLong(splits[1]));
    +
    +            case "lminmax":
    +                String[] args = argument.split(",");
    +                return new LMinMax(Long.parseLong(args[0]), Long.parseLong(args[1]));
    +
    +            case "dminmax":
    +                String[] doubles = argument.split(",");
    +                return new DMinMax(Double.parseDouble(doubles[0]), Double.parseDouble(doubles[1]));
    +
    +            case "equals":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("Equals does not expect any argument but has " + argument);
    +                return new Equals();
    +
    +            case "forbidsubstr":
    +                String[] forbiddenSubStrings = argument.split(",");
    +                for (int i = 0; i < forbiddenSubStrings.length; i++) {
    +                    forbiddenSubStrings[i] = forbiddenSubStrings[i].substring(1, forbiddenSubStrings[i].length() - 1);
    +                }
    +                return new ForbidSubStr(forbiddenSubStrings);
    +
    +            case "requiresubstr":
    +                String[] requiredSubStrings = argument.split(",");
    +                for (int i = 0; i < requiredSubStrings.length; i++) {
    +                    requiredSubStrings[i] = requiredSubStrings[i].substring(1, requiredSubStrings[i].length() - 1);
    +                }
    +                return new RequireSubStr(requiredSubStrings);
    +
    +            case "strnotnullorempty":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("StrNotNullOrEmpty does not expect any argument but has " + argument);
    +                return new StrNotNullOrEmpty();
    +
    +            case "requirehashcode":
    +                String[] hashs = argument.split(",");
    +                int[] hashcodes = new int[hashs.length];
    +                for(int i = 0; i < hashs.length; i++) {
    +                    hashcodes[i] = Integer.parseInt(hashs[i]);
    +                }
    +                return new RequireHashCode(hashcodes);
    +
    +            case "null":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("Null does not expect any argument but has " + argument);
    +                return null;
    +
    +            default:
    +                throw new IllegalArgumentException(method + " is not an allowed method to define a Cell Processor");
    --- End diff --
    
    This should quotes around "method". If a user has two commas in a row it can be quite confusing. 
    
    Also an explicit check if "method" is empty (suggesting the user has two commas in a row) would be nice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #476: NIFI-1942 Processor to validate CSV against user-supplied s...

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on the issue:

    https://github.com/apache/nifi/pull/476
  
    +1
    
    Visually verified code and did a contrib check build. In a standalone instance ran multiple different schemas through ValidateCSV testing each of the properties, all worked as expected. Thanks @pvillard31, I will merge it in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #476: NIFI-1942 Processor to validate CSV against user-supplied s...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/476
  
    Hey @JPercivall, I finally found the time to address your comment. I agree that offering the possibility to filter out the "wrong" lines is a nice feature. I chose the option to have one flow file containing valid lines and one containing invalid lines. WRT this choice, I am not sure it makes sens to add an attribute with an error message since the error could be different for each invalid lines. Let me know your thoughts! Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #476: NIFI-1942 Processor to validate CSV against user-sup...

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/476#discussion_r70718795
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java ---
    @@ -0,0 +1,408 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.supercsv.cellprocessor.Optional;
    +import org.supercsv.cellprocessor.ParseBigDecimal;
    +import org.supercsv.cellprocessor.ParseBool;
    +import org.supercsv.cellprocessor.ParseChar;
    +import org.supercsv.cellprocessor.ParseDate;
    +import org.supercsv.cellprocessor.ParseDouble;
    +import org.supercsv.cellprocessor.ParseInt;
    +import org.supercsv.cellprocessor.ParseLong;
    +import org.supercsv.cellprocessor.constraint.DMinMax;
    +import org.supercsv.cellprocessor.constraint.Equals;
    +import org.supercsv.cellprocessor.constraint.ForbidSubStr;
    +import org.supercsv.cellprocessor.constraint.LMinMax;
    +import org.supercsv.cellprocessor.constraint.NotNull;
    +import org.supercsv.cellprocessor.constraint.RequireHashCode;
    +import org.supercsv.cellprocessor.constraint.RequireSubStr;
    +import org.supercsv.cellprocessor.constraint.StrMinMax;
    +import org.supercsv.cellprocessor.constraint.StrNotNullOrEmpty;
    +import org.supercsv.cellprocessor.constraint.StrRegEx;
    +import org.supercsv.cellprocessor.constraint.Strlen;
    +import org.supercsv.cellprocessor.constraint.Unique;
    +import org.supercsv.cellprocessor.constraint.UniqueHashCode;
    +import org.supercsv.cellprocessor.ift.CellProcessor;
    +import org.supercsv.exception.SuperCsvCellProcessorException;
    +import org.supercsv.io.CsvListReader;
    +import org.supercsv.prefs.CsvPreference;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"csv", "schema", "validation"})
    +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified CSV schema")
    +public class ValidateCsv extends AbstractProcessor {
    +
    +    private final static List<String> allowedOperators = Arrays.asList("ParseBigDecimal", "ParseBool", "ParseChar", "ParseDate",
    +        "ParseDouble", "ParseInt", "ParseLong", "Optional", "DMinMax", "Equals", "ForbidSubStr", "LMinMax", "NotNull", "Null",
    +        "RequireHashCode", "RequireSubStr", "Strlen", "StrMinMax", "StrNotNullOrEmpty", "StrRegEx", "Unique",
    +        "UniqueHashCode");
    +
    +    public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
    +            .name("validate-csv-schema")
    +            .displayName("Schema")
    +            .description("The schema to be used for validation. Is expected a comma-delimited string representing" +
    +                    "the cell processors to apply. The following cell processors are allowed in the schema definition: " +
    +                    allowedOperators.toString() + ". Note: cell processors cannot be nested except with Optional.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor HEADER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-header")
    +            .displayName("Header")
    +            .description("True if the incoming flow file contains a header to ignore, false otherwise.")
    +            .required(true)
    +            .defaultValue("true")
    +            .allowableValues("true", "false")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor QUOTE_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-quote")
    +            .displayName("Quote character")
    +            .description("Character used as 'quote' in the incoming data. Example: \"")
    +            .required(true)
    +            .defaultValue("\"")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor DELIMITER_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-delimiter")
    +            .displayName("Delimiter character")
    +            .description("Character used as 'delimiter' in the incoming data. Example: ;")
    +            .required(true)
    +            .defaultValue(";")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor END_OF_LINE_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-eol")
    +            .displayName("End of line symbols")
    +            .description("Symbols used as 'end of line' in the incoming data. Example: \\n")
    +            .required(true)
    +            .defaultValue("\\n")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_VALID = new Relationship.Builder()
    +            .name("valid")
    +            .description("FlowFiles that are successfully validated against the schema are routed to this relationship")
    +            .build();
    +    public static final Relationship REL_INVALID = new Relationship.Builder()
    +            .name("invalid")
    +            .description("FlowFiles that are not valid according to the specified schema are routed to this relationship")
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +    private final AtomicReference<List<CellProcessor>> processors = new AtomicReference<List<CellProcessor>>();
    +    private final AtomicReference<CsvPreference> preference = new AtomicReference<CsvPreference>();
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(SCHEMA);
    +        properties.add(HEADER);
    +        properties.add(DELIMITER_CHARACTER);
    +        properties.add(QUOTE_CHARACTER);
    +        properties.add(END_OF_LINE_CHARACTER);
    +        this.properties = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_VALID);
    +        relationships.add(REL_INVALID);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    +        String schema = validationContext.getProperty(SCHEMA).getValue();
    +        try {
    +            this.parseSchema(validationContext.getProperty(SCHEMA).getValue());
    +        } catch (Exception e) {
    +            final List<ValidationResult> problems = new ArrayList<>(1);
    +            problems.add(new ValidationResult.Builder().subject(SCHEMA.getName())
    +                    .input(schema)
    +                    .valid(false)
    +                    .explanation("Error while parsing the schema: " + e.getMessage())
    +                    .build());
    +            return problems;
    +        }
    +        return super.customValidate(validationContext);
    +    }
    +
    +    @OnScheduled
    +    public void setPreference(final ProcessContext context) {
    +        this.preference.set(new CsvPreference.Builder(context.getProperty(QUOTE_CHARACTER).getValue().charAt(0),
    +                context.getProperty(DELIMITER_CHARACTER).getValue().charAt(0),
    +                context.getProperty(END_OF_LINE_CHARACTER).getValue()).build());
    +    }
    +
    +    /**
    +     * Method used to parse the string supplied by the user. The string is converted
    +     * to a list of cell processors used to validate the CSV data.
    +     * @param schema Schema to parse
    +     */
    +    private void parseSchema(String schema) {
    +        this.processors.set(new ArrayList<CellProcessor>());
    +        String remaining = schema;
    +        while(remaining.length() > 0) {
    +            remaining = setProcessor(remaining);
    +        }
    +    }
    +
    +    private String setProcessor(String remaining) {
    +        StringBuffer buffer = new StringBuffer();
    +        int i = 0;
    +        int opening = 0;
    +        int closing = 0;
    +        while(buffer.length() != remaining.length()) {
    +            char c = remaining.charAt(i);
    +            i++;
    +
    +            if(opening == 0 && c == ',') {
    +                if(i == 1) {
    +                    continue;
    +                }
    +                break;
    +            }
    +
    +            buffer.append(c);
    +
    +            if(c == '(') {
    +                opening++;
    +            } else if(c == ')') {
    +                closing++;
    +            }
    +
    +            if(opening > 0 && opening == closing) {
    +                break;
    +            }
    +        }
    +
    +        final String procString = buffer.toString().trim();
    +        opening = procString.indexOf('(');
    +        String method = procString;
    +        String argument = null;
    +        if(opening != -1) {
    +            argument = method.substring(opening + 1, method.length() - 1);
    +            method = method.substring(0, opening);
    +        }
    +
    +        this.processors.get().add(getProcessor(method.toLowerCase(), argument));
    +
    +        return remaining.substring(i);
    +    }
    +
    +    private CellProcessor getProcessor(String method, String argument) {
    +        switch (method) {
    +
    +        case "optional":
    --- End diff --
    
    nit pick: the cases should be indented a level from the switch statement


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #476: NIFI-1942 Processor to validate CSV against user-supplied s...

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on the issue:

    https://github.com/apache/nifi/pull/476
  
    I'm trying to use a couple of the processors listed in the accompanying doc you link to[1] but there seem to be some processors that aren't available (notably "IsIncludedIn"). I was trying to make sure a column had a value in a set of strings ("male" or "female"). 
    
    Is there a reason for not including all the processors available?
    
    [1] http://super-csv.github.io/super-csv/cell_processors.html
    [2] http://super-csv.github.io/super-csv/apidocs/org/supercsv/cellprocessor/constraint/IsIncludedIn.html


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #476: NIFI-1942 Processor to validate CSV against user-sup...

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/476#discussion_r72541263
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java ---
    @@ -0,0 +1,613 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.io.OutputStream;
    +import java.io.Reader;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.supercsv.cellprocessor.Optional;
    +import org.supercsv.cellprocessor.ParseBigDecimal;
    +import org.supercsv.cellprocessor.ParseBool;
    +import org.supercsv.cellprocessor.ParseChar;
    +import org.supercsv.cellprocessor.ParseDate;
    +import org.supercsv.cellprocessor.ParseDouble;
    +import org.supercsv.cellprocessor.ParseInt;
    +import org.supercsv.cellprocessor.ParseLong;
    +import org.supercsv.cellprocessor.constraint.DMinMax;
    +import org.supercsv.cellprocessor.constraint.Equals;
    +import org.supercsv.cellprocessor.constraint.ForbidSubStr;
    +import org.supercsv.cellprocessor.constraint.IsIncludedIn;
    +import org.supercsv.cellprocessor.constraint.LMinMax;
    +import org.supercsv.cellprocessor.constraint.NotNull;
    +import org.supercsv.cellprocessor.constraint.RequireHashCode;
    +import org.supercsv.cellprocessor.constraint.RequireSubStr;
    +import org.supercsv.cellprocessor.constraint.StrMinMax;
    +import org.supercsv.cellprocessor.constraint.StrNotNullOrEmpty;
    +import org.supercsv.cellprocessor.constraint.StrRegEx;
    +import org.supercsv.cellprocessor.constraint.Strlen;
    +import org.supercsv.cellprocessor.constraint.Unique;
    +import org.supercsv.cellprocessor.constraint.UniqueHashCode;
    +import org.supercsv.cellprocessor.ift.CellProcessor;
    +import org.supercsv.exception.SuperCsvCellProcessorException;
    +import org.supercsv.io.CsvListReader;
    +import org.supercsv.prefs.CsvPreference;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"csv", "schema", "validation"})
    +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified CSV schema. " +
    +        "Take a look at the additional documentation of this processor for some schema examples.")
    +public class ValidateCsv extends AbstractProcessor {
    +
    +    private final static List<String> allowedOperators = Arrays.asList("ParseBigDecimal", "ParseBool", "ParseChar", "ParseDate",
    +            "ParseDouble", "ParseInt", "ParseLong", "Optional", "DMinMax", "Equals", "ForbidSubStr", "LMinMax", "NotNull", "Null",
    +            "RequireHashCode", "RequireSubStr", "Strlen", "StrMinMax", "StrNotNullOrEmpty", "StrRegEx", "Unique",
    +            "UniqueHashCode", "IsIncludedIn");
    +
    +    private static final String routeWholeFlowFile = "Route to 'valid' the whole FlowFile if the CSV is valid";
    +    private static final String routeLinesIndividually = "Route to 'valid' a FlowFile containing the valid lines and"
    --- End diff --
    
    These property names are a bit too long, they don't fit on the drop-down for the allowable values. Any way to shorten them?
    
    ![screen shot 2016-07-27 at 7 12 47 pm](https://cloud.githubusercontent.com/assets/11302527/17195852/6be2fc36-542e-11e6-9488-a5aa905a1873.png)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #476: NIFI-1942 Processor to validate CSV against user-sup...

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/476#discussion_r72539247
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java ---
    @@ -0,0 +1,613 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.io.OutputStream;
    +import java.io.Reader;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.supercsv.cellprocessor.Optional;
    +import org.supercsv.cellprocessor.ParseBigDecimal;
    +import org.supercsv.cellprocessor.ParseBool;
    +import org.supercsv.cellprocessor.ParseChar;
    +import org.supercsv.cellprocessor.ParseDate;
    +import org.supercsv.cellprocessor.ParseDouble;
    +import org.supercsv.cellprocessor.ParseInt;
    +import org.supercsv.cellprocessor.ParseLong;
    +import org.supercsv.cellprocessor.constraint.DMinMax;
    +import org.supercsv.cellprocessor.constraint.Equals;
    +import org.supercsv.cellprocessor.constraint.ForbidSubStr;
    +import org.supercsv.cellprocessor.constraint.IsIncludedIn;
    +import org.supercsv.cellprocessor.constraint.LMinMax;
    +import org.supercsv.cellprocessor.constraint.NotNull;
    +import org.supercsv.cellprocessor.constraint.RequireHashCode;
    +import org.supercsv.cellprocessor.constraint.RequireSubStr;
    +import org.supercsv.cellprocessor.constraint.StrMinMax;
    +import org.supercsv.cellprocessor.constraint.StrNotNullOrEmpty;
    +import org.supercsv.cellprocessor.constraint.StrRegEx;
    +import org.supercsv.cellprocessor.constraint.Strlen;
    +import org.supercsv.cellprocessor.constraint.Unique;
    +import org.supercsv.cellprocessor.constraint.UniqueHashCode;
    +import org.supercsv.cellprocessor.ift.CellProcessor;
    +import org.supercsv.exception.SuperCsvCellProcessorException;
    +import org.supercsv.io.CsvListReader;
    +import org.supercsv.prefs.CsvPreference;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"csv", "schema", "validation"})
    +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified CSV schema. " +
    +        "Take a look at the additional documentation of this processor for some schema examples.")
    +public class ValidateCsv extends AbstractProcessor {
    +
    +    private final static List<String> allowedOperators = Arrays.asList("ParseBigDecimal", "ParseBool", "ParseChar", "ParseDate",
    +            "ParseDouble", "ParseInt", "ParseLong", "Optional", "DMinMax", "Equals", "ForbidSubStr", "LMinMax", "NotNull", "Null",
    +            "RequireHashCode", "RequireSubStr", "Strlen", "StrMinMax", "StrNotNullOrEmpty", "StrRegEx", "Unique",
    +            "UniqueHashCode", "IsIncludedIn");
    +
    +    private static final String routeWholeFlowFile = "Route to 'valid' the whole FlowFile if the CSV is valid";
    +    private static final String routeLinesIndividually = "Route to 'valid' a FlowFile containing the valid lines and"
    +            + " to 'invalid' a FlowFile containing the invalid lines";
    +
    +    public static final AllowableValue VALIDATE_WHOLE_FLOWFILE = new AllowableValue(routeWholeFlowFile, routeWholeFlowFile,
    +            "In case an error is found in the CSV file, the whole flow file will be routed to the 'invalid' relationship. "
    +                    + "This option offers best performances.");
    +
    +    public static final AllowableValue VALIDATE_LINES_INDIVIDUALLY = new AllowableValue(routeLinesIndividually, routeLinesIndividually,
    +            "In case an error is found, the input CSV file will be split into two FlowFiles: one routed to the 'valid' "
    +                    + "relationship containing all the correct lines and one routed to the 'invalid' relationship containing all "
    +                    + "the incorrect lines. Take care if choosing this option while using Unique cell processors in schema definition.");
    --- End diff --
    
    I like calling out the Unique processor here but it'd nice to have more explicit why and what happens if you do use it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #476: NIFI-1942 Processor to validate CSV against user-supplied s...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/476
  
    Hey @JPercivall, I think I've addressed most of your comments:
    - changed log level
    - fixed decrementation (was not related to header but rather to 'finally' in 'while' loop)
    - strategy naming and description
    - fixed first line handling and added a unit test for that and to confirm Unique() behavior when validating line by line
    - changed exception catching
    
    It remains what you observed when displaying content of flow files after being processed by the processor. I've reproduced your observation but I didn't find any explanation. By any chance, do you know if there are some specific encoding related to the UI display? (I remember some discussions regarding how is processed the carriage return (shift + enter instead of \n) when used in a property in some processors) If you haves ideas, let me know!
    
    Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #476: NIFI-1942 Processor to validate CSV against user-sup...

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/476#discussion_r70716478
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java ---
    @@ -0,0 +1,408 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.supercsv.cellprocessor.Optional;
    +import org.supercsv.cellprocessor.ParseBigDecimal;
    +import org.supercsv.cellprocessor.ParseBool;
    +import org.supercsv.cellprocessor.ParseChar;
    +import org.supercsv.cellprocessor.ParseDate;
    +import org.supercsv.cellprocessor.ParseDouble;
    +import org.supercsv.cellprocessor.ParseInt;
    +import org.supercsv.cellprocessor.ParseLong;
    +import org.supercsv.cellprocessor.constraint.DMinMax;
    +import org.supercsv.cellprocessor.constraint.Equals;
    +import org.supercsv.cellprocessor.constraint.ForbidSubStr;
    +import org.supercsv.cellprocessor.constraint.LMinMax;
    +import org.supercsv.cellprocessor.constraint.NotNull;
    +import org.supercsv.cellprocessor.constraint.RequireHashCode;
    +import org.supercsv.cellprocessor.constraint.RequireSubStr;
    +import org.supercsv.cellprocessor.constraint.StrMinMax;
    +import org.supercsv.cellprocessor.constraint.StrNotNullOrEmpty;
    +import org.supercsv.cellprocessor.constraint.StrRegEx;
    +import org.supercsv.cellprocessor.constraint.Strlen;
    +import org.supercsv.cellprocessor.constraint.Unique;
    +import org.supercsv.cellprocessor.constraint.UniqueHashCode;
    +import org.supercsv.cellprocessor.ift.CellProcessor;
    +import org.supercsv.exception.SuperCsvCellProcessorException;
    +import org.supercsv.io.CsvListReader;
    +import org.supercsv.prefs.CsvPreference;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"csv", "schema", "validation"})
    +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified CSV schema")
    +public class ValidateCsv extends AbstractProcessor {
    +
    +    private final static List<String> allowedOperators = Arrays.asList("ParseBigDecimal", "ParseBool", "ParseChar", "ParseDate",
    +        "ParseDouble", "ParseInt", "ParseLong", "Optional", "DMinMax", "Equals", "ForbidSubStr", "LMinMax", "NotNull", "Null",
    +        "RequireHashCode", "RequireSubStr", "Strlen", "StrMinMax", "StrNotNullOrEmpty", "StrRegEx", "Unique",
    +        "UniqueHashCode");
    +
    +    public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
    +            .name("validate-csv-schema")
    +            .displayName("Schema")
    +            .description("The schema to be used for validation. Is expected a comma-delimited string representing" +
    +                    "the cell processors to apply. The following cell processors are allowed in the schema definition: " +
    +                    allowedOperators.toString() + ". Note: cell processors cannot be nested except with Optional.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor HEADER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-header")
    +            .displayName("Header")
    +            .description("True if the incoming flow file contains a header to ignore, false otherwise.")
    +            .required(true)
    +            .defaultValue("true")
    +            .allowableValues("true", "false")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor QUOTE_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-quote")
    +            .displayName("Quote character")
    +            .description("Character used as 'quote' in the incoming data. Example: \"")
    +            .required(true)
    +            .defaultValue("\"")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor DELIMITER_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-delimiter")
    +            .displayName("Delimiter character")
    +            .description("Character used as 'delimiter' in the incoming data. Example: ;")
    +            .required(true)
    +            .defaultValue(";")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor END_OF_LINE_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-eol")
    +            .displayName("End of line symbols")
    +            .description("Symbols used as 'end of line' in the incoming data. Example: \\n")
    +            .required(true)
    +            .defaultValue("\\n")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_VALID = new Relationship.Builder()
    +            .name("valid")
    +            .description("FlowFiles that are successfully validated against the schema are routed to this relationship")
    +            .build();
    +    public static final Relationship REL_INVALID = new Relationship.Builder()
    +            .name("invalid")
    +            .description("FlowFiles that are not valid according to the specified schema are routed to this relationship")
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +    private final AtomicReference<List<CellProcessor>> processors = new AtomicReference<List<CellProcessor>>();
    +    private final AtomicReference<CsvPreference> preference = new AtomicReference<CsvPreference>();
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(SCHEMA);
    +        properties.add(HEADER);
    +        properties.add(DELIMITER_CHARACTER);
    +        properties.add(QUOTE_CHARACTER);
    +        properties.add(END_OF_LINE_CHARACTER);
    +        this.properties = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_VALID);
    +        relationships.add(REL_INVALID);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    +        String schema = validationContext.getProperty(SCHEMA).getValue();
    +        try {
    +            this.parseSchema(validationContext.getProperty(SCHEMA).getValue());
    +        } catch (Exception e) {
    +            final List<ValidationResult> problems = new ArrayList<>(1);
    +            problems.add(new ValidationResult.Builder().subject(SCHEMA.getName())
    +                    .input(schema)
    +                    .valid(false)
    +                    .explanation("Error while parsing the schema: " + e.getMessage())
    +                    .build());
    +            return problems;
    +        }
    +        return super.customValidate(validationContext);
    +    }
    +
    +    @OnScheduled
    +    public void setPreference(final ProcessContext context) {
    +        this.preference.set(new CsvPreference.Builder(context.getProperty(QUOTE_CHARACTER).getValue().charAt(0),
    +                context.getProperty(DELIMITER_CHARACTER).getValue().charAt(0),
    +                context.getProperty(END_OF_LINE_CHARACTER).getValue()).build());
    +    }
    +
    +    /**
    +     * Method used to parse the string supplied by the user. The string is converted
    +     * to a list of cell processors used to validate the CSV data.
    +     * @param schema Schema to parse
    +     */
    +    private void parseSchema(String schema) {
    +        this.processors.set(new ArrayList<CellProcessor>());
    +        String remaining = schema;
    +        while(remaining.length() > 0) {
    +            remaining = setProcessor(remaining);
    +        }
    +    }
    +
    +    private String setProcessor(String remaining) {
    +        StringBuffer buffer = new StringBuffer();
    +        int i = 0;
    +        int opening = 0;
    +        int closing = 0;
    +        while(buffer.length() != remaining.length()) {
    +            char c = remaining.charAt(i);
    +            i++;
    +
    +            if(opening == 0 && c == ',') {
    +                if(i == 1) {
    +                    continue;
    +                }
    +                break;
    +            }
    +
    +            buffer.append(c);
    +
    +            if(c == '(') {
    +                opening++;
    +            } else if(c == ')') {
    +                closing++;
    +            }
    +
    +            if(opening > 0 && opening == closing) {
    +                break;
    +            }
    +        }
    +
    +        final String procString = buffer.toString().trim();
    +        opening = procString.indexOf('(');
    +        String method = procString;
    +        String argument = null;
    +        if(opening != -1) {
    +            argument = method.substring(opening + 1, method.length() - 1);
    +            method = method.substring(0, opening);
    +        }
    +
    +        this.processors.get().add(getProcessor(method.toLowerCase(), argument));
    +
    +        return remaining.substring(i);
    +    }
    +
    +    private CellProcessor getProcessor(String method, String argument) {
    +        switch (method) {
    +
    +        case "optional":
    +            int opening = argument.indexOf('(');
    +            String subMethod = argument;
    +            String subArgument = null;
    +            if(opening != -1) {
    +                subArgument = subMethod.substring(opening + 1, subMethod.length() - 1);
    +                subMethod = subMethod.substring(0, opening);
    +            }
    +            return new Optional(getProcessor(subMethod.toLowerCase(), subArgument));
    +
    +        case "parsedate":
    +            return new ParseDate(argument.substring(1, argument.length() - 1));
    +
    +        case "parsedouble":
    +            return new ParseDouble();
    +
    +        case "parsebigdecimal":
    +            return new ParseBigDecimal();
    +
    +        case "parsebool":
    +            return new ParseBool();
    +
    +        case "parsechar":
    +            return new ParseChar();
    +
    +        case "parseint":
    +            return new ParseInt();
    +
    +        case "parselong":
    +            return new ParseLong();
    +
    +        case "notnull":
    +            return new NotNull();
    +
    +        case "strregex":
    +            return new StrRegEx(argument.substring(1, argument.length() - 1));
    +
    +        case "unique":
    +            return new Unique();
    +
    +        case "uniquehashcode":
    +            return new UniqueHashCode();
    +
    +        case "strlen":
    +            String[] splts = argument.split(",");
    +            int[] requiredLengths = new int[splts.length];
    +            for(int i = 0; i < splts.length; i++) {
    +                requiredLengths[i] = Integer.parseInt(splts[i]);
    +            }
    +            return new Strlen(requiredLengths);
    +
    +        case "strminmax":
    +            String[] splits = argument.split(",");
    +            return new StrMinMax(Long.parseLong(splits[0]), Long.parseLong(splits[1]));
    +
    +        case "lminmax":
    +            String[] args = argument.split(",");
    +            return new LMinMax(Long.parseLong(args[0]), Long.parseLong(args[1]));
    +
    +        case "dminmax":
    +            String[] doubles = argument.split(",");
    +            return new DMinMax(Double.parseDouble(doubles[0]), Double.parseDouble(doubles[1]));
    +
    +        case "equals":
    +            return new Equals();
    +
    +        case "forbidsubstr":
    +            String[] forbiddenSubStrings = argument.split(",");
    +            for (int i = 0; i < forbiddenSubStrings.length; i++) {
    +                forbiddenSubStrings[i] = forbiddenSubStrings[i].substring(1, forbiddenSubStrings[i].length() - 1);
    +            }
    +            return new ForbidSubStr(forbiddenSubStrings);
    +
    +        case "requiresubstr":
    +            String[] requiredSubStrings = argument.split(",");
    +            for (int i = 0; i < requiredSubStrings.length; i++) {
    +                requiredSubStrings[i] = requiredSubStrings[i].substring(1, requiredSubStrings[i].length() - 1);
    +            }
    +            return new RequireSubStr(requiredSubStrings);
    +
    +        case "strnotnullorempty":
    +            return new StrNotNullOrEmpty();
    +
    +        case "requirehashcode":
    +            String[] hashs = argument.split(",");
    +            int[] hashcodes = new int[hashs.length];
    +            for(int i = 0; i < hashs.length; i++) {
    +                hashcodes[i] = Integer.parseInt(hashs[i]);
    +            }
    +            return new RequireHashCode(hashcodes);
    +
    +        case "null":
    +            return null;
    +
    +        default:
    +            throw new IllegalArgumentException(method + " is not an allowed method to define a Cell Processor");
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    +        final List<FlowFile> flowFiles = session.get(50);
    +        if (flowFiles.isEmpty()) {
    +            return;
    +        }
    +
    +        final CsvPreference csvPref = this.preference.get();
    +        final boolean header = context.getProperty(HEADER).asBoolean();
    +        final ComponentLog logger = getLogger();
    +        final CellProcessor[] cellProcs = this.processors.get().toArray(new CellProcessor[this.processors.get().size()]);
    --- End diff --
    
    Can this be done in the onScheduled? I feel there is decent amount of allocating and processing that could be re-used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #476: NIFI-1942 Processor to validate CSV against user-supplied s...

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on the issue:

    https://github.com/apache/nifi/pull/476
  
    @pvillard31, sorry for not realizing this earlier but a great improvement that could be made is to route like RouteText does. Where it can either route the whole FlowFile when it fails or route each line to the respective valid/invalid destination. Where invalid rows would have an attribute detailing what went wrong.
    
    I believe all that would need to be done is to create flowfiles with each reading of the row and if it fails add it to the invalid list (along with adding an attribute to it).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #476: NIFI-1942 Processor to validate CSV against user-sup...

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/476#discussion_r72544927
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java ---
    @@ -0,0 +1,613 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.io.OutputStream;
    +import java.io.Reader;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.supercsv.cellprocessor.Optional;
    +import org.supercsv.cellprocessor.ParseBigDecimal;
    +import org.supercsv.cellprocessor.ParseBool;
    +import org.supercsv.cellprocessor.ParseChar;
    +import org.supercsv.cellprocessor.ParseDate;
    +import org.supercsv.cellprocessor.ParseDouble;
    +import org.supercsv.cellprocessor.ParseInt;
    +import org.supercsv.cellprocessor.ParseLong;
    +import org.supercsv.cellprocessor.constraint.DMinMax;
    +import org.supercsv.cellprocessor.constraint.Equals;
    +import org.supercsv.cellprocessor.constraint.ForbidSubStr;
    +import org.supercsv.cellprocessor.constraint.IsIncludedIn;
    +import org.supercsv.cellprocessor.constraint.LMinMax;
    +import org.supercsv.cellprocessor.constraint.NotNull;
    +import org.supercsv.cellprocessor.constraint.RequireHashCode;
    +import org.supercsv.cellprocessor.constraint.RequireSubStr;
    +import org.supercsv.cellprocessor.constraint.StrMinMax;
    +import org.supercsv.cellprocessor.constraint.StrNotNullOrEmpty;
    +import org.supercsv.cellprocessor.constraint.StrRegEx;
    +import org.supercsv.cellprocessor.constraint.Strlen;
    +import org.supercsv.cellprocessor.constraint.Unique;
    +import org.supercsv.cellprocessor.constraint.UniqueHashCode;
    +import org.supercsv.cellprocessor.ift.CellProcessor;
    +import org.supercsv.exception.SuperCsvCellProcessorException;
    +import org.supercsv.io.CsvListReader;
    +import org.supercsv.prefs.CsvPreference;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"csv", "schema", "validation"})
    +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified CSV schema. " +
    +        "Take a look at the additional documentation of this processor for some schema examples.")
    +public class ValidateCsv extends AbstractProcessor {
    +
    +    private final static List<String> allowedOperators = Arrays.asList("ParseBigDecimal", "ParseBool", "ParseChar", "ParseDate",
    +            "ParseDouble", "ParseInt", "ParseLong", "Optional", "DMinMax", "Equals", "ForbidSubStr", "LMinMax", "NotNull", "Null",
    +            "RequireHashCode", "RequireSubStr", "Strlen", "StrMinMax", "StrNotNullOrEmpty", "StrRegEx", "Unique",
    +            "UniqueHashCode", "IsIncludedIn");
    +
    +    private static final String routeWholeFlowFile = "Route to 'valid' the whole FlowFile if the CSV is valid";
    +    private static final String routeLinesIndividually = "Route to 'valid' a FlowFile containing the valid lines and"
    +            + " to 'invalid' a FlowFile containing the invalid lines";
    +
    +    public static final AllowableValue VALIDATE_WHOLE_FLOWFILE = new AllowableValue(routeWholeFlowFile, routeWholeFlowFile,
    +            "In case an error is found in the CSV file, the whole flow file will be routed to the 'invalid' relationship. "
    +                    + "This option offers best performances.");
    +
    +    public static final AllowableValue VALIDATE_LINES_INDIVIDUALLY = new AllowableValue(routeLinesIndividually, routeLinesIndividually,
    +            "In case an error is found, the input CSV file will be split into two FlowFiles: one routed to the 'valid' "
    +                    + "relationship containing all the correct lines and one routed to the 'invalid' relationship containing all "
    +                    + "the incorrect lines. Take care if choosing this option while using Unique cell processors in schema definition.");
    +
    +    public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
    +            .name("validate-csv-schema")
    +            .displayName("Schema")
    +            .description("The schema to be used for validation. Is expected a comma-delimited string representing the cell "
    +                    + "processors to apply. The following cell processors are allowed in the schema definition: "
    +                    + allowedOperators.toString() + ". Note: cell processors cannot be nested except with Optional.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor HEADER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-header")
    +            .displayName("Header")
    +            .description("True if the incoming flow file contains a header to ignore, false otherwise.")
    +            .required(true)
    +            .defaultValue("true")
    +            .allowableValues("true", "false")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor QUOTE_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-quote")
    +            .displayName("Quote character")
    +            .description("Character used as 'quote' in the incoming data. Example: \"")
    +            .required(true)
    +            .defaultValue("\"")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor DELIMITER_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-delimiter")
    +            .displayName("Delimiter character")
    +            .description("Character used as 'delimiter' in the incoming data. Example: ,")
    +            .required(true)
    +            .defaultValue(",")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor END_OF_LINE_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-eol")
    +            .displayName("End of line symbols")
    +            .description("Symbols used as 'end of line' in the incoming data. Example: \\n")
    +            .required(true)
    +            .defaultValue("\\n")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor VALIDATION_STRATEGY = new PropertyDescriptor.Builder()
    +            .name("validate-csv-strategy")
    +            .displayName("Validation strategy")
    +            .description("Strategy to apply when routing input files to output relationships.")
    +            .required(true)
    +            .defaultValue(VALIDATE_WHOLE_FLOWFILE.getValue())
    +            .allowableValues(VALIDATE_LINES_INDIVIDUALLY, VALIDATE_WHOLE_FLOWFILE)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_VALID = new Relationship.Builder()
    +            .name("valid")
    +            .description("FlowFiles that are successfully validated against the schema are routed to this relationship")
    +            .build();
    +    public static final Relationship REL_INVALID = new Relationship.Builder()
    +            .name("invalid")
    +            .description("FlowFiles that are not valid according to the specified schema are routed to this relationship")
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +    private final AtomicReference<CellProcessor[]> processors = new AtomicReference<CellProcessor[]>();
    +    private final AtomicReference<CsvPreference> preference = new AtomicReference<CsvPreference>();
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(SCHEMA);
    +        properties.add(HEADER);
    +        properties.add(DELIMITER_CHARACTER);
    +        properties.add(QUOTE_CHARACTER);
    +        properties.add(END_OF_LINE_CHARACTER);
    +        properties.add(VALIDATION_STRATEGY);
    +        this.properties = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_VALID);
    +        relationships.add(REL_INVALID);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    +        String schema = validationContext.getProperty(SCHEMA).getValue();
    +        try {
    +            this.parseSchema(validationContext.getProperty(SCHEMA).getValue());
    +        } catch (Exception e) {
    +            final List<ValidationResult> problems = new ArrayList<>(1);
    +            problems.add(new ValidationResult.Builder().subject(SCHEMA.getName())
    +                    .input(schema)
    +                    .valid(false)
    +                    .explanation("Error while parsing the schema: " + e.getMessage())
    +                    .build());
    +            return problems;
    +        }
    +        return super.customValidate(validationContext);
    +    }
    +
    +    @OnScheduled
    +    public void setPreference(final ProcessContext context) {
    +        this.preference.set(new CsvPreference.Builder(context.getProperty(QUOTE_CHARACTER).getValue().charAt(0),
    +                context.getProperty(DELIMITER_CHARACTER).getValue().charAt(0),
    +                context.getProperty(END_OF_LINE_CHARACTER).getValue()).build());
    +    }
    +
    +    /**
    +     * Method used to parse the string supplied by the user. The string is converted
    +     * to a list of cell processors used to validate the CSV data.
    +     * @param schema Schema to parse
    +     */
    +    private void parseSchema(String schema) {
    +        List<CellProcessor> processorsList = new ArrayList<CellProcessor>();
    +
    +        String remaining = schema;
    +        while(remaining.length() > 0) {
    +            remaining = setProcessor(remaining, processorsList);
    +        }
    +
    +        this.processors.set(processorsList.toArray(new CellProcessor[processorsList.size()]));
    +    }
    +
    +    private String setProcessor(String remaining, List<CellProcessor> processorsList) {
    +        StringBuffer buffer = new StringBuffer();
    +        int i = 0;
    +        int opening = 0;
    +        int closing = 0;
    +        while(buffer.length() != remaining.length()) {
    +            char c = remaining.charAt(i);
    +            i++;
    +
    +            if(opening == 0 && c == ',') {
    +                if(i == 1) {
    +                    continue;
    +                }
    +                break;
    +            }
    +
    +            buffer.append(c);
    +
    +            if(c == '(') {
    +                opening++;
    +            } else if(c == ')') {
    +                closing++;
    +            }
    +
    +            if(opening > 0 && opening == closing) {
    +                break;
    +            }
    +        }
    +
    +        final String procString = buffer.toString().trim();
    +        opening = procString.indexOf('(');
    +        String method = procString;
    +        String argument = null;
    +        if(opening != -1) {
    +            argument = method.substring(opening + 1, method.length() - 1);
    +            method = method.substring(0, opening);
    +        }
    +
    +        processorsList.add(getProcessor(method.toLowerCase(), argument));
    +
    +        return remaining.substring(i);
    +    }
    +
    +    private CellProcessor getProcessor(String method, String argument) {
    +        switch (method) {
    +
    +            case "optional":
    +                int opening = argument.indexOf('(');
    +                String subMethod = argument;
    +                String subArgument = null;
    +                if(opening != -1) {
    +                    subArgument = subMethod.substring(opening + 1, subMethod.length() - 1);
    +                    subMethod = subMethod.substring(0, opening);
    +                }
    +                return new Optional(getProcessor(subMethod.toLowerCase(), subArgument));
    +
    +            case "parsedate":
    +                return new ParseDate(argument.substring(1, argument.length() - 1));
    +
    +            case "parsedouble":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseDouble does not expect any argument but has " + argument);
    +                return new ParseDouble();
    +
    +            case "parsebigdecimal":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseBigDecimal does not expect any argument but has " + argument);
    +                return new ParseBigDecimal();
    +
    +            case "parsebool":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseBool does not expect any argument but has " + argument);
    +                return new ParseBool();
    +
    +            case "parsechar":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseChar does not expect any argument but has " + argument);
    +                return new ParseChar();
    +
    +            case "parseint":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseInt does not expect any argument but has " + argument);
    +                return new ParseInt();
    +
    +            case "parselong":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseLong does not expect any argument but has " + argument);
    +                return new ParseLong();
    +
    +            case "notnull":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("NotNull does not expect any argument but has " + argument);
    +                return new NotNull();
    +
    +            case "strregex":
    +                return new StrRegEx(argument.substring(1, argument.length() - 1));
    +
    +            case "unique":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("Unique does not expect any argument but has " + argument);
    +                return new Unique();
    +
    +            case "uniquehashcode":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("UniqueHashCode does not expect any argument but has " + argument);
    +                return new UniqueHashCode();
    +
    +            case "strlen":
    +                String[] splts = argument.split(",");
    +                int[] requiredLengths = new int[splts.length];
    +                for(int i = 0; i < splts.length; i++) {
    +                    requiredLengths[i] = Integer.parseInt(splts[i]);
    +                }
    +                return new Strlen(requiredLengths);
    +
    +            case "strminmax":
    +                String[] splits = argument.split(",");
    +                return new StrMinMax(Long.parseLong(splits[0]), Long.parseLong(splits[1]));
    +
    +            case "lminmax":
    +                String[] args = argument.split(",");
    +                return new LMinMax(Long.parseLong(args[0]), Long.parseLong(args[1]));
    +
    +            case "dminmax":
    +                String[] doubles = argument.split(",");
    +                return new DMinMax(Double.parseDouble(doubles[0]), Double.parseDouble(doubles[1]));
    +
    +            case "equals":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("Equals does not expect any argument but has " + argument);
    +                return new Equals();
    +
    +            case "forbidsubstr":
    +                String[] forbiddenSubStrings = argument.replaceAll("\"", "").split(",[ ]*");
    +                return new ForbidSubStr(forbiddenSubStrings);
    +
    +            case "requiresubstr":
    +                String[] requiredSubStrings = argument.replaceAll("\"", "").split(",[ ]*");
    +                return new RequireSubStr(requiredSubStrings);
    +
    +            case "strnotnullorempty":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("StrNotNullOrEmpty does not expect any argument but has " + argument);
    +                return new StrNotNullOrEmpty();
    +
    +            case "requirehashcode":
    +                String[] hashs = argument.split(",");
    +                int[] hashcodes = new int[hashs.length];
    +                for(int i = 0; i < hashs.length; i++) {
    +                    hashcodes[i] = Integer.parseInt(hashs[i]);
    +                }
    +                return new RequireHashCode(hashcodes);
    +
    +            case "null":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("Null does not expect any argument but has " + argument);
    +                return null;
    +
    +            case "isincludedin":
    +                String[] elements = argument.replaceAll("\"", "").split(",[ ]*");
    +                return new IsIncludedIn(elements);
    +
    +            default:
    +                throw new IllegalArgumentException("[" + method + "] is not an allowed method to define a Cell Processor");
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final CsvPreference csvPref = this.preference.get();
    +        final boolean header = context.getProperty(HEADER).asBoolean();
    +        final ComponentLog logger = getLogger();
    +        final CellProcessor[] cellProcs = this.processors.get();
    +        final boolean isWholeFFValidation = context.getProperty(VALIDATION_STRATEGY).getValue().equals(VALIDATE_WHOLE_FLOWFILE.getValue());
    +
    +        final AtomicReference<Boolean> valid = new AtomicReference<Boolean>(true);
    +        final AtomicReference<Boolean> isFirstLine = new AtomicReference<Boolean>(true);
    +        final AtomicReference<Integer> okCount = new AtomicReference<Integer>(0);
    +        final AtomicReference<Integer> totalCount = new AtomicReference<Integer>(0);
    +        final AtomicReference<FlowFile> invalidFF = new AtomicReference<FlowFile>(null);
    +        final AtomicReference<FlowFile> validFF = new AtomicReference<FlowFile>(null);
    +
    +        if(!isWholeFFValidation) {
    +            invalidFF.set(session.create(flowFile));
    +            validFF.set(session.create(flowFile));
    +        }
    +
    +        session.read(flowFile, new InputStreamCallback() {
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                NifiCsvListReader listReader = null;
    +                try {
    +                    listReader = new NifiCsvListReader(new InputStreamReader(in), csvPref);
    +
    +                    // handling of header
    +                    if(header) {
    +                        List<String> headerList = listReader.read();
    +                        if(!isWholeFFValidation) {
    +                            invalidFF.set(session.append(invalidFF.get(), new OutputStreamCallback() {
    +                                @Override
    +                                public void process(OutputStream out) throws IOException {
    +                                    out.write(print(headerList, csvPref, isFirstLine.get()));
    +                                }
    +                            }));
    +                            validFF.set(session.append(validFF.get(), new OutputStreamCallback() {
    +                                @Override
    +                                public void process(OutputStream out) throws IOException {
    +                                    out.write(print(headerList, csvPref, isFirstLine.get()));
    +                                }
    +                            }));
    +                            isFirstLine.set(false);
    +                        }
    +                    }
    +
    +                    boolean stop = false;
    +
    +                    while (!stop) {
    +                        try {
    +
    +                            final List<Object> list = listReader.read(cellProcs);
    +                            stop = list == null;
    +
    +                            if(!isWholeFFValidation && !stop) {
    +                                validFF.set(session.append(validFF.get(), new OutputStreamCallback() {
    +                                    @Override
    +                                    public void process(OutputStream out) throws IOException {
    +                                        out.write(print(list, csvPref, isFirstLine.get()));
    +                                    }
    +                                }));
    +                                okCount.set(okCount.get() + 1);
    +
    +                                if(isFirstLine.get()) {
    +                                    isFirstLine.set(false);
    +                                }
    +                            }
    +
    +                        } catch (final SuperCsvCellProcessorException e) {
    +                            valid.set(false);
    +                            if(isWholeFFValidation) {
    +                                logger.debug("Failed to validate {} against schema due to {}; routing to 'invalid'", new Object[]{flowFile}, e);
    +                                break;
    +                            } else {
    +                                // we append the invalid line to the flow file that will be routed to invalid relationship
    +                                invalidFF.set(session.append(invalidFF.get(), new OutputStreamCallback() {
    +                                    @Override
    +                                    public void process(OutputStream out) throws IOException {
    +                                        out.write(print(e.getCsvContext().getRowSource(), csvPref, isFirstLine.get()));
    +                                    }
    +                                }));
    +
    +                                if(isFirstLine.get()) {
    +                                    isFirstLine.set(false);
    --- End diff --
    
    Currently valid and invalid share the same "isFirstLine". So if the first row is valid and the second is invalid the invalid FF will be prepended with the new line character(s)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #476: NIFI-1942 Processor to validate CSV against user-sup...

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/476#discussion_r71088438
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java ---
    @@ -0,0 +1,433 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.supercsv.cellprocessor.Optional;
    +import org.supercsv.cellprocessor.ParseBigDecimal;
    +import org.supercsv.cellprocessor.ParseBool;
    +import org.supercsv.cellprocessor.ParseChar;
    +import org.supercsv.cellprocessor.ParseDate;
    +import org.supercsv.cellprocessor.ParseDouble;
    +import org.supercsv.cellprocessor.ParseInt;
    +import org.supercsv.cellprocessor.ParseLong;
    +import org.supercsv.cellprocessor.constraint.DMinMax;
    +import org.supercsv.cellprocessor.constraint.Equals;
    +import org.supercsv.cellprocessor.constraint.ForbidSubStr;
    +import org.supercsv.cellprocessor.constraint.LMinMax;
    +import org.supercsv.cellprocessor.constraint.NotNull;
    +import org.supercsv.cellprocessor.constraint.RequireHashCode;
    +import org.supercsv.cellprocessor.constraint.RequireSubStr;
    +import org.supercsv.cellprocessor.constraint.StrMinMax;
    +import org.supercsv.cellprocessor.constraint.StrNotNullOrEmpty;
    +import org.supercsv.cellprocessor.constraint.StrRegEx;
    +import org.supercsv.cellprocessor.constraint.Strlen;
    +import org.supercsv.cellprocessor.constraint.Unique;
    +import org.supercsv.cellprocessor.constraint.UniqueHashCode;
    +import org.supercsv.cellprocessor.ift.CellProcessor;
    +import org.supercsv.exception.SuperCsvCellProcessorException;
    +import org.supercsv.io.CsvListReader;
    +import org.supercsv.prefs.CsvPreference;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"csv", "schema", "validation"})
    +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified CSV schema. " +
    +        "Take a look at the additional documentation of this processor for some schema examples.")
    +public class ValidateCsv extends AbstractProcessor {
    +
    +    private final static List<String> allowedOperators = Arrays.asList("ParseBigDecimal", "ParseBool", "ParseChar", "ParseDate",
    +            "ParseDouble", "ParseInt", "ParseLong", "Optional", "DMinMax", "Equals", "ForbidSubStr", "LMinMax", "NotNull", "Null",
    +            "RequireHashCode", "RequireSubStr", "Strlen", "StrMinMax", "StrNotNullOrEmpty", "StrRegEx", "Unique",
    +            "UniqueHashCode");
    +
    +    public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
    +            .name("validate-csv-schema")
    +            .displayName("Schema")
    +            .description("The schema to be used for validation. Is expected a comma-delimited string representing" +
    +                    "the cell processors to apply. The following cell processors are allowed in the schema definition: " +
    +                    allowedOperators.toString() + ". Note: cell processors cannot be nested except with Optional.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor HEADER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-header")
    +            .displayName("Header")
    +            .description("True if the incoming flow file contains a header to ignore, false otherwise.")
    +            .required(true)
    +            .defaultValue("true")
    +            .allowableValues("true", "false")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor QUOTE_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-quote")
    +            .displayName("Quote character")
    +            .description("Character used as 'quote' in the incoming data. Example: \"")
    +            .required(true)
    +            .defaultValue("\"")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor DELIMITER_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-delimiter")
    +            .displayName("Delimiter character")
    +            .description("Character used as 'delimiter' in the incoming data. Example: ,")
    +            .required(true)
    +            .defaultValue(",")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor END_OF_LINE_CHARACTER = new PropertyDescriptor.Builder()
    +            .name("validate-csv-eol")
    +            .displayName("End of line symbols")
    +            .description("Symbols used as 'end of line' in the incoming data. Example: \\n")
    +            .required(true)
    +            .defaultValue("\\n")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_VALID = new Relationship.Builder()
    +            .name("valid")
    +            .description("FlowFiles that are successfully validated against the schema are routed to this relationship")
    +            .build();
    +    public static final Relationship REL_INVALID = new Relationship.Builder()
    +            .name("invalid")
    +            .description("FlowFiles that are not valid according to the specified schema are routed to this relationship")
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +    private final AtomicReference<CellProcessor[]> processors = new AtomicReference<CellProcessor[]>();
    +    private final AtomicReference<CsvPreference> preference = new AtomicReference<CsvPreference>();
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(SCHEMA);
    +        properties.add(HEADER);
    +        properties.add(DELIMITER_CHARACTER);
    +        properties.add(QUOTE_CHARACTER);
    +        properties.add(END_OF_LINE_CHARACTER);
    +        this.properties = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_VALID);
    +        relationships.add(REL_INVALID);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    +        String schema = validationContext.getProperty(SCHEMA).getValue();
    +        try {
    +            this.parseSchema(validationContext.getProperty(SCHEMA).getValue());
    +        } catch (Exception e) {
    +            final List<ValidationResult> problems = new ArrayList<>(1);
    +            problems.add(new ValidationResult.Builder().subject(SCHEMA.getName())
    +                    .input(schema)
    +                    .valid(false)
    +                    .explanation("Error while parsing the schema: " + e.getMessage())
    +                    .build());
    +            return problems;
    +        }
    +        return super.customValidate(validationContext);
    +    }
    +
    +    @OnScheduled
    +    public void setPreference(final ProcessContext context) {
    +        this.preference.set(new CsvPreference.Builder(context.getProperty(QUOTE_CHARACTER).getValue().charAt(0),
    +                context.getProperty(DELIMITER_CHARACTER).getValue().charAt(0),
    +                context.getProperty(END_OF_LINE_CHARACTER).getValue()).build());
    +    }
    +
    +    /**
    +     * Method used to parse the string supplied by the user. The string is converted
    +     * to a list of cell processors used to validate the CSV data.
    +     * @param schema Schema to parse
    +     */
    +    private void parseSchema(String schema) {
    +        List<CellProcessor> processorsList = new ArrayList<CellProcessor>();
    +
    +        String remaining = schema;
    +        while(remaining.length() > 0) {
    +            remaining = setProcessor(remaining, processorsList);
    +        }
    +
    +        this.processors.set(processorsList.toArray(new CellProcessor[processorsList.size()]));
    +    }
    +
    +    private String setProcessor(String remaining, List<CellProcessor> processorsList) {
    +        StringBuffer buffer = new StringBuffer();
    +        int i = 0;
    +        int opening = 0;
    +        int closing = 0;
    +        while(buffer.length() != remaining.length()) {
    +            char c = remaining.charAt(i);
    +            i++;
    +
    +            if(opening == 0 && c == ',') {
    +                if(i == 1) {
    +                    continue;
    +                }
    +                break;
    +            }
    +
    +            buffer.append(c);
    +
    +            if(c == '(') {
    +                opening++;
    +            } else if(c == ')') {
    +                closing++;
    +            }
    +
    +            if(opening > 0 && opening == closing) {
    +                break;
    +            }
    +        }
    +
    +        final String procString = buffer.toString().trim();
    +        opening = procString.indexOf('(');
    +        String method = procString;
    +        String argument = null;
    +        if(opening != -1) {
    +            argument = method.substring(opening + 1, method.length() - 1);
    +            method = method.substring(0, opening);
    +        }
    +
    +        processorsList.add(getProcessor(method.toLowerCase(), argument));
    +
    +        return remaining.substring(i);
    +    }
    +
    +    private CellProcessor getProcessor(String method, String argument) {
    +        switch (method) {
    +
    +            case "optional":
    +                int opening = argument.indexOf('(');
    +                String subMethod = argument;
    +                String subArgument = null;
    +                if(opening != -1) {
    +                    subArgument = subMethod.substring(opening + 1, subMethod.length() - 1);
    +                    subMethod = subMethod.substring(0, opening);
    +                }
    +                return new Optional(getProcessor(subMethod.toLowerCase(), subArgument));
    +
    +            case "parsedate":
    +                return new ParseDate(argument.substring(1, argument.length() - 1));
    +
    +            case "parsedouble":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseDouble does not expect any argument but has " + argument);
    +                return new ParseDouble();
    +
    +            case "parsebigdecimal":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseBigDecimal does not expect any argument but has " + argument);
    +                return new ParseBigDecimal();
    +
    +            case "parsebool":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseBool does not expect any argument but has " + argument);
    +                return new ParseBool();
    +
    +            case "parsechar":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseChar does not expect any argument but has " + argument);
    +                return new ParseChar();
    +
    +            case "parseint":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseInt does not expect any argument but has " + argument);
    +                return new ParseInt();
    +
    +            case "parselong":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("ParseLong does not expect any argument but has " + argument);
    +                return new ParseLong();
    +
    +            case "notnull":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("NotNull does not expect any argument but has " + argument);
    +                return new NotNull();
    +
    +            case "strregex":
    +                return new StrRegEx(argument.substring(1, argument.length() - 1));
    +
    +            case "unique":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("Unique does not expect any argument but has " + argument);
    +                return new Unique();
    +
    +            case "uniquehashcode":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("UniqueHashCode does not expect any argument but has " + argument);
    +                return new UniqueHashCode();
    +
    +            case "strlen":
    +                String[] splts = argument.split(",");
    +                int[] requiredLengths = new int[splts.length];
    +                for(int i = 0; i < splts.length; i++) {
    +                    requiredLengths[i] = Integer.parseInt(splts[i]);
    +                }
    +                return new Strlen(requiredLengths);
    +
    +            case "strminmax":
    +                String[] splits = argument.split(",");
    +                return new StrMinMax(Long.parseLong(splits[0]), Long.parseLong(splits[1]));
    +
    +            case "lminmax":
    +                String[] args = argument.split(",");
    +                return new LMinMax(Long.parseLong(args[0]), Long.parseLong(args[1]));
    +
    +            case "dminmax":
    +                String[] doubles = argument.split(",");
    +                return new DMinMax(Double.parseDouble(doubles[0]), Double.parseDouble(doubles[1]));
    +
    +            case "equals":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("Equals does not expect any argument but has " + argument);
    +                return new Equals();
    +
    +            case "forbidsubstr":
    +                String[] forbiddenSubStrings = argument.split(",");
    +                for (int i = 0; i < forbiddenSubStrings.length; i++) {
    +                    forbiddenSubStrings[i] = forbiddenSubStrings[i].substring(1, forbiddenSubStrings[i].length() - 1);
    +                }
    +                return new ForbidSubStr(forbiddenSubStrings);
    +
    +            case "requiresubstr":
    +                String[] requiredSubStrings = argument.split(",");
    +                for (int i = 0; i < requiredSubStrings.length; i++) {
    +                    requiredSubStrings[i] = requiredSubStrings[i].substring(1, requiredSubStrings[i].length() - 1);
    +                }
    +                return new RequireSubStr(requiredSubStrings);
    +
    +            case "strnotnullorempty":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("StrNotNullOrEmpty does not expect any argument but has " + argument);
    +                return new StrNotNullOrEmpty();
    +
    +            case "requirehashcode":
    +                String[] hashs = argument.split(",");
    +                int[] hashcodes = new int[hashs.length];
    +                for(int i = 0; i < hashs.length; i++) {
    +                    hashcodes[i] = Integer.parseInt(hashs[i]);
    +                }
    +                return new RequireHashCode(hashcodes);
    +
    +            case "null":
    +                if(argument != null && !argument.isEmpty())
    +                    throw new IllegalArgumentException("Null does not expect any argument but has " + argument);
    +                return null;
    +
    +            default:
    +                throw new IllegalArgumentException(method + " is not an allowed method to define a Cell Processor");
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final CsvPreference csvPref = this.preference.get();
    +        final boolean header = context.getProperty(HEADER).asBoolean();
    +        final ComponentLog logger = getLogger();
    +        final CellProcessor[] cellProcs = this.processors.get();
    +
    +        final AtomicReference<Boolean> valid = new AtomicReference<Boolean>(true);
    +
    +        session.read(flowFile, new InputStreamCallback() {
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                CsvListReader listReader = null;
    +                try {
    +                    listReader = new CsvListReader(new InputStreamReader(in), csvPref);
    +                    if(header) {
    +                        listReader.read();
    +                    }
    +                    while(listReader.read(cellProcs) != null) {}
    +                } catch (final IOException | SuperCsvCellProcessorException e) {
    --- End diff --
    
    This should break IOException and SuperCsvCellProcessorException into separate catch blocks. A SuperCsvCellProcessorException should be logged at DEBUG because it will get thrown for invalid files as a normal part of processing for this processor. 
    
    For example, I used your template with the schema "Null, ParseDate("dd/MM/yyyy"), Optional(ParseDouble())". I had the second column have a value of "22/111954". It failed to parse the data and correctly routed it to invalid but logged an ERROR bulletin.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #476: NIFI-1942 Processor to validate CSV against user-sup...

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/476#discussion_r70715716
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java ---
    @@ -0,0 +1,408 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.supercsv.cellprocessor.Optional;
    +import org.supercsv.cellprocessor.ParseBigDecimal;
    +import org.supercsv.cellprocessor.ParseBool;
    +import org.supercsv.cellprocessor.ParseChar;
    +import org.supercsv.cellprocessor.ParseDate;
    +import org.supercsv.cellprocessor.ParseDouble;
    +import org.supercsv.cellprocessor.ParseInt;
    +import org.supercsv.cellprocessor.ParseLong;
    +import org.supercsv.cellprocessor.constraint.DMinMax;
    +import org.supercsv.cellprocessor.constraint.Equals;
    +import org.supercsv.cellprocessor.constraint.ForbidSubStr;
    +import org.supercsv.cellprocessor.constraint.LMinMax;
    +import org.supercsv.cellprocessor.constraint.NotNull;
    +import org.supercsv.cellprocessor.constraint.RequireHashCode;
    +import org.supercsv.cellprocessor.constraint.RequireSubStr;
    +import org.supercsv.cellprocessor.constraint.StrMinMax;
    +import org.supercsv.cellprocessor.constraint.StrNotNullOrEmpty;
    +import org.supercsv.cellprocessor.constraint.StrRegEx;
    +import org.supercsv.cellprocessor.constraint.Strlen;
    +import org.supercsv.cellprocessor.constraint.Unique;
    +import org.supercsv.cellprocessor.constraint.UniqueHashCode;
    +import org.supercsv.cellprocessor.ift.CellProcessor;
    +import org.supercsv.exception.SuperCsvCellProcessorException;
    +import org.supercsv.io.CsvListReader;
    +import org.supercsv.prefs.CsvPreference;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"csv", "schema", "validation"})
    +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified CSV schema")
    --- End diff --
    
    Ah duh, I should have just read the additional docs portion. A comment here reminding users to checkout the additional details would suffice


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #476: NIFI-1942 Processor to validate CSV against user-sup...

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/476#discussion_r70718112
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java ---
    @@ -0,0 +1,408 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.supercsv.cellprocessor.Optional;
    +import org.supercsv.cellprocessor.ParseBigDecimal;
    +import org.supercsv.cellprocessor.ParseBool;
    +import org.supercsv.cellprocessor.ParseChar;
    +import org.supercsv.cellprocessor.ParseDate;
    +import org.supercsv.cellprocessor.ParseDouble;
    +import org.supercsv.cellprocessor.ParseInt;
    +import org.supercsv.cellprocessor.ParseLong;
    +import org.supercsv.cellprocessor.constraint.DMinMax;
    +import org.supercsv.cellprocessor.constraint.Equals;
    +import org.supercsv.cellprocessor.constraint.ForbidSubStr;
    +import org.supercsv.cellprocessor.constraint.LMinMax;
    +import org.supercsv.cellprocessor.constraint.NotNull;
    +import org.supercsv.cellprocessor.constraint.RequireHashCode;
    +import org.supercsv.cellprocessor.constraint.RequireSubStr;
    +import org.supercsv.cellprocessor.constraint.StrMinMax;
    +import org.supercsv.cellprocessor.constraint.StrNotNullOrEmpty;
    +import org.supercsv.cellprocessor.constraint.StrRegEx;
    +import org.supercsv.cellprocessor.constraint.Strlen;
    +import org.supercsv.cellprocessor.constraint.Unique;
    +import org.supercsv.cellprocessor.constraint.UniqueHashCode;
    +import org.supercsv.cellprocessor.ift.CellProcessor;
    +import org.supercsv.exception.SuperCsvCellProcessorException;
    +import org.supercsv.io.CsvListReader;
    +import org.supercsv.prefs.CsvPreference;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"csv", "schema", "validation"})
    +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified CSV schema")
    +public class ValidateCsv extends AbstractProcessor {
    +
    +    private final static List<String> allowedOperators = Arrays.asList("ParseBigDecimal", "ParseBool", "ParseChar", "ParseDate",
    +        "ParseDouble", "ParseInt", "ParseLong", "Optional", "DMinMax", "Equals", "ForbidSubStr", "LMinMax", "NotNull", "Null",
    +        "RequireHashCode", "RequireSubStr", "Strlen", "StrMinMax", "StrNotNullOrEmpty", "StrRegEx", "Unique",
    +        "UniqueHashCode");
    +
    +    public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
    +            .name("validate-csv-schema")
    +            .displayName("Schema")
    +            .description("The schema to be used for validation. Is expected a comma-delimited string representing" +
    +                    "the cell processors to apply. The following cell processors are allowed in the schema definition: " +
    +                    allowedOperators.toString() + ". Note: cell processors cannot be nested except with Optional.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    --- End diff --
    
    Is there any way to do some validation here? I can see there being some pretty complicated schemas and a missing comma could lead to a lot of user headaches without any validation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #476: NIFI-1942 Processor to validate CSV against user-sup...

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/476#discussion_r70718523
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java ---
    @@ -0,0 +1,408 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.supercsv.cellprocessor.Optional;
    +import org.supercsv.cellprocessor.ParseBigDecimal;
    +import org.supercsv.cellprocessor.ParseBool;
    +import org.supercsv.cellprocessor.ParseChar;
    +import org.supercsv.cellprocessor.ParseDate;
    +import org.supercsv.cellprocessor.ParseDouble;
    +import org.supercsv.cellprocessor.ParseInt;
    +import org.supercsv.cellprocessor.ParseLong;
    +import org.supercsv.cellprocessor.constraint.DMinMax;
    +import org.supercsv.cellprocessor.constraint.Equals;
    +import org.supercsv.cellprocessor.constraint.ForbidSubStr;
    +import org.supercsv.cellprocessor.constraint.LMinMax;
    +import org.supercsv.cellprocessor.constraint.NotNull;
    +import org.supercsv.cellprocessor.constraint.RequireHashCode;
    +import org.supercsv.cellprocessor.constraint.RequireSubStr;
    +import org.supercsv.cellprocessor.constraint.StrMinMax;
    +import org.supercsv.cellprocessor.constraint.StrNotNullOrEmpty;
    +import org.supercsv.cellprocessor.constraint.StrRegEx;
    +import org.supercsv.cellprocessor.constraint.Strlen;
    +import org.supercsv.cellprocessor.constraint.Unique;
    +import org.supercsv.cellprocessor.constraint.UniqueHashCode;
    +import org.supercsv.cellprocessor.ift.CellProcessor;
    +import org.supercsv.exception.SuperCsvCellProcessorException;
    +import org.supercsv.io.CsvListReader;
    +import org.supercsv.prefs.CsvPreference;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"csv", "schema", "validation"})
    +@CapabilityDescription("Validates the contents of FlowFiles against a user-specified CSV schema")
    +public class ValidateCsv extends AbstractProcessor {
    +
    +    private final static List<String> allowedOperators = Arrays.asList("ParseBigDecimal", "ParseBool", "ParseChar", "ParseDate",
    +        "ParseDouble", "ParseInt", "ParseLong", "Optional", "DMinMax", "Equals", "ForbidSubStr", "LMinMax", "NotNull", "Null",
    +        "RequireHashCode", "RequireSubStr", "Strlen", "StrMinMax", "StrNotNullOrEmpty", "StrRegEx", "Unique",
    +        "UniqueHashCode");
    +
    +    public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
    +            .name("validate-csv-schema")
    +            .displayName("Schema")
    +            .description("The schema to be used for validation. Is expected a comma-delimited string representing" +
    +                    "the cell processors to apply. The following cell processors are allowed in the schema definition: " +
    +                    allowedOperators.toString() + ". Note: cell processors cannot be nested except with Optional.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    --- End diff --
    
    -_- I really need to double check the commit before commenting, I see you already put this validation in the custom validate


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #476: NIFI-1942 Processor to validate CSV against user-supplied s...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/476
  
    Hey @JPercivall, that should be OK now, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---